Files
eve-clickhouse-scraper/main.go
2026-01-17 13:08:08 +01:00

254 lines
6.5 KiB
Go

package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
logger "git.site.quack-lab.dev/dave/cylogger"
utils "git.site.quack-lab.dev/dave/cyutils"
"github.com/joho/godotenv"
)
var ESIUrl = "https://esi.evetech.net/markets/10000002/orders"
var clickhouseURL = "http://mclickhouse.site.quack-lab.dev"
var totalPages = 1
var clickhouseUser string
var clickhousePassword string
func main() {
flag.Parse()
logger.InitFlag()
godotenv.Load()
clickhouseUser = os.Getenv("CLICKHOUSE_USER")
clickhousePassword = os.Getenv("CLICKHOUSE_PASSWORD")
if clickhouseUser == "" || clickhousePassword == "" {
logger.Error("CLICKHOUSE_USER and CLICKHOUSE_PASSWORD must be set")
return
}
logger.Info("CLICKHOUSE_USER: %s", clickhouseUser)
shortPw := clickhousePassword
if len(shortPw) > 4 {
shortPw = shortPw[:4]
}
logger.Info("CLICKHOUSE_PASSWORD: %s", shortPw)
// Initialize ClickHouse table
if err := InitClickhouseTable(); err != nil {
logger.Error("Failed to initialize ClickHouse table: %v", err)
}
http.HandleFunc("/process", processHandler)
port := "8090"
logger.Info("Starting server on port %s", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
logger.Error("Failed to start server: %v", err)
}
}
var isRunning uint32
func processHandler(w http.ResponseWriter, r *http.Request) {
if !atomic.CompareAndSwapUint32(&isRunning, 0, 1) {
w.Write([]byte("Already running"))
return
}
go func() {
defer atomic.StoreUint32(&isRunning, 0)
logger.Info("Starting market data processing")
if err := ProcessMarketData(); err != nil {
logger.Error("Failed to process market data: %v", err)
} else {
logger.Info("Market data processing completed")
}
}()
w.Write([]byte("Started"))
}
func InitClickhouseTable() error {
query := `
CREATE TABLE IF NOT EXISTS market_orders (
order_id UInt64,
type_id UInt64,
issued DateTime,
price UInt64,
volume_remain UInt64,
volume_total UInt64,
is_buy_order Bool,
inserted_at DateTime
)
ENGINE = MergeTree()
ORDER BY (type_id, issued, order_id)
PARTITION BY toYYYYMM(issued)
SETTINGS index_granularity = 8192
`
// Create indices for common query patterns
indices := []string{
"CREATE INDEX IF NOT EXISTS idx_price_skip_index ON market_orders (price, type_id) TYPE minmax GRANULARITY 4",
"CREATE INDEX IF NOT EXISTS idx_buy_order_skip_index ON market_orders (is_buy_order, type_id) TYPE set(4) GRANULARITY 4",
"CREATE INDEX IF NOT EXISTS idx_inserted_at_skip_index ON market_orders (inserted_at) TYPE minmax GRANULARITY 4",
}
// Create table
logger.Info("Creating ClickHouse table...")
if err := executeClickhouseQuery(query); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
// Create indices
logger.Info("Creating ClickHouse indices...")
for _, idx := range indices {
if err := executeClickhouseQuery(idx); err != nil {
logger.Warning("Failed to create index (may already exist): %v", err)
}
}
logger.Info("ClickHouse table initialization complete")
return nil
}
func executeClickhouseQuery(query string) error {
req, err := http.NewRequest("POST", clickhouseURL, bytes.NewReader([]byte(query)))
if err != nil {
return err
}
req.Header.Set("Content-Type", "text/plain")
req.SetBasicAuth(clickhouseUser, clickhousePassword)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("ClickHouse query failed: %s", body)
}
return nil
}
func ProcessMarketData() error {
// Generate a single timestamp for this entire run
runTimestamp := time.Now().Format("2006-01-02 15:04:05")
logger.Info("Run timestamp: %s", runTimestamp)
marketData, err := FetchMarketPage(1)
if err != nil {
return fmt.Errorf("failed to fetch market page %d: %w", 1, err)
}
logger.Info("Market data: %+v", marketData)
logger.Info("Total pages: %d", totalPages)
pages := make([]int, totalPages)
for i := 1; i <= totalPages; i++ {
pages[i-1] = i
}
utils.WithWorkers(8, pages, func(worker int, page int, value int) {
flog := logger.Default.
WithPrefix(fmt.Sprintf("Worker %d", worker)).
WithPrefix(fmt.Sprintf("Page %d", page))
flog.Info("Fetching market data")
marketData, err := FetchMarketPage(page)
if err != nil {
flog.Error(fmt.Sprintf("Failed to fetch market data: %v", err))
return
}
flog.Info("Inserting market data")
err = InsertClickhouse(marketData, runTimestamp)
if err != nil {
flog.Error(fmt.Sprintf("Failed to insert market data: %v", err))
return
}
})
return nil
}
func FetchMarketPage(page int) (MarketResponse, error) {
var marketData MarketResponse
url := fmt.Sprintf("%s?page=%d", ESIUrl, page)
resp, err := http.Get(url)
if err != nil {
return marketData, err
}
defer resp.Body.Close()
totalPagesStr := resp.Header.Get("X-Pages")
totalPagesInt, err := strconv.Atoi(totalPagesStr)
if err != nil {
return marketData, err
}
totalPages = totalPagesInt
body, err := io.ReadAll(resp.Body)
if err != nil {
return marketData, err
}
err = json.Unmarshal(body, &marketData)
if err != nil {
return marketData, err
}
return marketData, nil
}
func InsertClickhouse(marketData MarketResponse, runTimestamp string) error {
// Build ClickHouse batch INSERT query
var insertParts = []string{
"INSERT INTO market_orders (order_id,type_id,issued,price,volume_remain,volume_total,is_buy_order,inserted_at)",
"VALUES",
}
values := make([]string, 0, len(marketData))
for _, order := range marketData {
// Format issued as 'YYYY-MM-DD HH:MM:SS'
issued := order.Issued.Format("2006-01-02 15:04:05")
order.Price *= 100 // Float -> int, mISK -> ISK
val := fmt.Sprintf("(%d, %d, '%s', %f, %d, %d, %t, '%s')", order.OrderID, order.TypeID, issued, order.Price, order.VolumeRemain, order.VolumeTotal, order.IsBuyOrder, runTimestamp)
values = append(values, val)
}
query := fmt.Sprintf("%s %s", insertParts[0], insertParts[1]) + " " + strings.Join(values, ", ")
req, err := http.NewRequest("POST", clickhouseURL, strings.NewReader(query))
if err != nil {
return err
}
req.Header.Set("Content-Type", "text/plain")
req.SetBasicAuth(clickhouseUser, clickhousePassword)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("ClickHouse insert failed: %s", body)
}
return nil
}