package main import ( "bytes" "encoding/json" "flag" "fmt" "io" "net/http" "os" "strconv" "strings" "sync/atomic" "time" logger "git.site.quack-lab.dev/dave/cylogger" utils "git.site.quack-lab.dev/dave/cyutils" "github.com/joho/godotenv" ) var ESIUrl = "https://esi.evetech.net/markets/10000002/orders" var clickhouseURL = "http://mclickhouse.site.quack-lab.dev" var totalPages = 1 var clickhouseUser string var clickhousePassword string func main() { flag.Parse() logger.InitFlag() godotenv.Load() clickhouseUser = os.Getenv("CLICKHOUSE_USER") clickhousePassword = os.Getenv("CLICKHOUSE_PASSWORD") if clickhouseUser == "" || clickhousePassword == "" { logger.Error("CLICKHOUSE_USER and CLICKHOUSE_PASSWORD must be set") return } logger.Info("CLICKHOUSE_USER: %s", clickhouseUser) shortPw := clickhousePassword if len(shortPw) > 4 { shortPw = shortPw[:4] } logger.Info("CLICKHOUSE_PASSWORD: %s", shortPw) // Initialize ClickHouse table if err := InitClickhouseTable(); err != nil { logger.Error("Failed to initialize ClickHouse table: %v", err) } http.HandleFunc("/process", processHandler) port := "8090" logger.Info("Starting server on port %s", port) if err := http.ListenAndServe(":"+port, nil); err != nil { logger.Error("Failed to start server: %v", err) } } var isRunning uint32 func processHandler(w http.ResponseWriter, r *http.Request) { if !atomic.CompareAndSwapUint32(&isRunning, 0, 1) { w.Write([]byte("Already running")) return } go func() { defer atomic.StoreUint32(&isRunning, 0) logger.Info("Starting market data processing") if err := ProcessMarketData(); err != nil { logger.Error("Failed to process market data: %v", err) } else { logger.Info("Market data processing completed") } }() w.Write([]byte("Started")) } func InitClickhouseTable() error { query := ` CREATE TABLE IF NOT EXISTS market_orders ( order_id UInt64, type_id UInt64, issued DateTime, price UInt64, volume_remain UInt64, volume_total UInt64, is_buy_order Bool, inserted_at DateTime ) ENGINE = MergeTree() ORDER BY (type_id, issued, order_id) PARTITION BY toYYYYMM(issued) SETTINGS index_granularity = 8192 ` // Create indices for common query patterns indices := []string{ "CREATE INDEX IF NOT EXISTS idx_price_skip_index ON market_orders (price, type_id) TYPE minmax GRANULARITY 4", "CREATE INDEX IF NOT EXISTS idx_buy_order_skip_index ON market_orders (is_buy_order, type_id) TYPE set(4) GRANULARITY 4", "CREATE INDEX IF NOT EXISTS idx_inserted_at_skip_index ON market_orders (inserted_at) TYPE minmax GRANULARITY 4", } // Create table logger.Info("Creating ClickHouse table...") if err := executeClickhouseQuery(query); err != nil { return fmt.Errorf("failed to create table: %w", err) } // Create indices logger.Info("Creating ClickHouse indices...") for _, idx := range indices { if err := executeClickhouseQuery(idx); err != nil { logger.Warning("Failed to create index (may already exist): %v", err) } } logger.Info("ClickHouse table initialization complete") return nil } func executeClickhouseQuery(query string) error { req, err := http.NewRequest("POST", clickhouseURL, bytes.NewReader([]byte(query))) if err != nil { return err } req.Header.Set("Content-Type", "text/plain") req.SetBasicAuth(clickhouseUser, clickhousePassword) client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("ClickHouse query failed: %s", body) } return nil } func ProcessMarketData() error { // Generate a single timestamp for this entire run runTimestamp := time.Now().Format("2006-01-02 15:04:05") logger.Info("Run timestamp: %s", runTimestamp) marketData, err := FetchMarketPage(1) if err != nil { return fmt.Errorf("failed to fetch market page %d: %w", 1, err) } logger.Info("Market data: %+v", marketData) logger.Info("Total pages: %d", totalPages) pages := make([]int, totalPages) for i := 1; i <= totalPages; i++ { pages[i-1] = i } utils.WithWorkers(8, pages, func(worker int, page int, value int) { flog := logger.Default. WithPrefix(fmt.Sprintf("Worker %d", worker)). WithPrefix(fmt.Sprintf("Page %d", page)) flog.Info("Fetching market data") marketData, err := FetchMarketPage(page) if err != nil { flog.Error(fmt.Sprintf("Failed to fetch market data: %v", err)) return } flog.Info("Inserting market data") err = InsertClickhouse(marketData, runTimestamp) if err != nil { flog.Error(fmt.Sprintf("Failed to insert market data: %v", err)) return } }) return nil } func FetchMarketPage(page int) (MarketResponse, error) { var marketData MarketResponse url := fmt.Sprintf("%s?page=%d", ESIUrl, page) resp, err := http.Get(url) if err != nil { return marketData, err } defer resp.Body.Close() totalPagesStr := resp.Header.Get("X-Pages") totalPagesInt, err := strconv.Atoi(totalPagesStr) if err != nil { return marketData, err } totalPages = totalPagesInt body, err := io.ReadAll(resp.Body) if err != nil { return marketData, err } err = json.Unmarshal(body, &marketData) if err != nil { return marketData, err } return marketData, nil } func InsertClickhouse(marketData MarketResponse, runTimestamp string) error { // Build ClickHouse batch INSERT query var insertParts = []string{ "INSERT INTO market_orders (order_id,type_id,issued,price,volume_remain,volume_total,is_buy_order,inserted_at)", "VALUES", } values := make([]string, 0, len(marketData)) for _, order := range marketData { // Format issued as 'YYYY-MM-DD HH:MM:SS' issued := order.Issued.Format("2006-01-02 15:04:05") order.Price *= 100 // Float -> int, mISK -> ISK val := fmt.Sprintf("(%d, %d, '%s', %f, %d, %d, %t, '%s')", order.OrderID, order.TypeID, issued, order.Price, order.VolumeRemain, order.VolumeTotal, order.IsBuyOrder, runTimestamp) values = append(values, val) } query := fmt.Sprintf("%s %s", insertParts[0], insertParts[1]) + " " + strings.Join(values, ", ") req, err := http.NewRequest("POST", clickhouseURL, strings.NewReader(query)) if err != nil { return err } req.Header.Set("Content-Type", "text/plain") req.SetBasicAuth(clickhouseUser, clickhousePassword) client := &http.Client{} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("ClickHouse insert failed: %s", body) } return nil }