254 lines
6.5 KiB
Go
254 lines
6.5 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
logger "git.site.quack-lab.dev/dave/cylogger"
|
|
utils "git.site.quack-lab.dev/dave/cyutils"
|
|
"github.com/joho/godotenv"
|
|
)
|
|
|
|
var ESIUrl = "https://esi.evetech.net/markets/10000002/orders"
|
|
var clickhouseURL = "http://mclickhouse.site.quack-lab.dev"
|
|
var totalPages = 1
|
|
var clickhouseUser string
|
|
var clickhousePassword string
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
logger.InitFlag()
|
|
|
|
godotenv.Load()
|
|
|
|
clickhouseUser = os.Getenv("CLICKHOUSE_USER")
|
|
clickhousePassword = os.Getenv("CLICKHOUSE_PASSWORD")
|
|
if clickhouseUser == "" || clickhousePassword == "" {
|
|
logger.Error("CLICKHOUSE_USER and CLICKHOUSE_PASSWORD must be set")
|
|
return
|
|
}
|
|
logger.Info("CLICKHOUSE_USER: %s", clickhouseUser)
|
|
shortPw := clickhousePassword
|
|
if len(shortPw) > 4 {
|
|
shortPw = shortPw[:4]
|
|
}
|
|
logger.Info("CLICKHOUSE_PASSWORD: %s", shortPw)
|
|
|
|
// Initialize ClickHouse table
|
|
if err := InitClickhouseTable(); err != nil {
|
|
logger.Error("Failed to initialize ClickHouse table: %v", err)
|
|
}
|
|
|
|
http.HandleFunc("/process", processHandler)
|
|
|
|
port := "8090"
|
|
logger.Info("Starting server on port %s", port)
|
|
if err := http.ListenAndServe(":"+port, nil); err != nil {
|
|
logger.Error("Failed to start server: %v", err)
|
|
}
|
|
}
|
|
|
|
var isRunning uint32
|
|
|
|
func processHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !atomic.CompareAndSwapUint32(&isRunning, 0, 1) {
|
|
w.Write([]byte("Already running"))
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
defer atomic.StoreUint32(&isRunning, 0)
|
|
logger.Info("Starting market data processing")
|
|
if err := ProcessMarketData(); err != nil {
|
|
logger.Error("Failed to process market data: %v", err)
|
|
} else {
|
|
logger.Info("Market data processing completed")
|
|
}
|
|
}()
|
|
|
|
w.Write([]byte("Started"))
|
|
}
|
|
|
|
func InitClickhouseTable() error {
|
|
query := `
|
|
CREATE TABLE IF NOT EXISTS market_orders (
|
|
order_id UInt64,
|
|
type_id UInt64,
|
|
issued DateTime,
|
|
price UInt64,
|
|
volume_remain UInt64,
|
|
volume_total UInt64,
|
|
is_buy_order Bool,
|
|
inserted_at DateTime
|
|
)
|
|
ENGINE = MergeTree()
|
|
ORDER BY (type_id, issued, order_id)
|
|
PARTITION BY toYYYYMM(issued)
|
|
SETTINGS index_granularity = 8192
|
|
`
|
|
|
|
// Create indices for common query patterns
|
|
indices := []string{
|
|
"CREATE INDEX IF NOT EXISTS idx_price_skip_index ON market_orders (price, type_id) TYPE minmax GRANULARITY 4",
|
|
"CREATE INDEX IF NOT EXISTS idx_buy_order_skip_index ON market_orders (is_buy_order, type_id) TYPE set(4) GRANULARITY 4",
|
|
"CREATE INDEX IF NOT EXISTS idx_inserted_at_skip_index ON market_orders (inserted_at) TYPE minmax GRANULARITY 4",
|
|
}
|
|
|
|
// Create table
|
|
logger.Info("Creating ClickHouse table...")
|
|
if err := executeClickhouseQuery(query); err != nil {
|
|
return fmt.Errorf("failed to create table: %w", err)
|
|
}
|
|
|
|
// Create indices
|
|
logger.Info("Creating ClickHouse indices...")
|
|
for _, idx := range indices {
|
|
if err := executeClickhouseQuery(idx); err != nil {
|
|
logger.Warning("Failed to create index (may already exist): %v", err)
|
|
}
|
|
}
|
|
|
|
logger.Info("ClickHouse table initialization complete")
|
|
return nil
|
|
}
|
|
|
|
func executeClickhouseQuery(query string) error {
|
|
req, err := http.NewRequest("POST", clickhouseURL, bytes.NewReader([]byte(query)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "text/plain")
|
|
req.SetBasicAuth(clickhouseUser, clickhousePassword)
|
|
|
|
client := &http.Client{}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("ClickHouse query failed: %s", body)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ProcessMarketData() error {
|
|
// Generate a single timestamp for this entire run
|
|
runTimestamp := time.Now().Format("2006-01-02 15:04:05")
|
|
logger.Info("Run timestamp: %s", runTimestamp)
|
|
|
|
marketData, err := FetchMarketPage(1)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch market page %d: %w", 1, err)
|
|
}
|
|
|
|
logger.Info("Market data: %+v", marketData)
|
|
logger.Info("Total pages: %d", totalPages)
|
|
pages := make([]int, totalPages)
|
|
for i := 1; i <= totalPages; i++ {
|
|
pages[i-1] = i
|
|
}
|
|
|
|
utils.WithWorkers(8, pages, func(worker int, page int, value int) {
|
|
flog := logger.Default.
|
|
WithPrefix(fmt.Sprintf("Worker %d", worker)).
|
|
WithPrefix(fmt.Sprintf("Page %d", page))
|
|
|
|
flog.Info("Fetching market data")
|
|
marketData, err := FetchMarketPage(page)
|
|
if err != nil {
|
|
flog.Error(fmt.Sprintf("Failed to fetch market data: %v", err))
|
|
return
|
|
}
|
|
flog.Info("Inserting market data")
|
|
err = InsertClickhouse(marketData, runTimestamp)
|
|
if err != nil {
|
|
flog.Error(fmt.Sprintf("Failed to insert market data: %v", err))
|
|
return
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func FetchMarketPage(page int) (MarketResponse, error) {
|
|
var marketData MarketResponse
|
|
|
|
url := fmt.Sprintf("%s?page=%d", ESIUrl, page)
|
|
resp, err := http.Get(url)
|
|
if err != nil {
|
|
return marketData, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
totalPagesStr := resp.Header.Get("X-Pages")
|
|
totalPagesInt, err := strconv.Atoi(totalPagesStr)
|
|
if err != nil {
|
|
return marketData, err
|
|
}
|
|
totalPages = totalPagesInt
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return marketData, err
|
|
}
|
|
|
|
err = json.Unmarshal(body, &marketData)
|
|
if err != nil {
|
|
return marketData, err
|
|
}
|
|
|
|
return marketData, nil
|
|
}
|
|
|
|
func InsertClickhouse(marketData MarketResponse, runTimestamp string) error {
|
|
// Build ClickHouse batch INSERT query
|
|
var insertParts = []string{
|
|
"INSERT INTO market_orders (order_id,type_id,issued,price,volume_remain,volume_total,is_buy_order,inserted_at)",
|
|
"VALUES",
|
|
}
|
|
|
|
values := make([]string, 0, len(marketData))
|
|
for _, order := range marketData {
|
|
// Format issued as 'YYYY-MM-DD HH:MM:SS'
|
|
issued := order.Issued.Format("2006-01-02 15:04:05")
|
|
order.Price *= 100 // Float -> int, mISK -> ISK
|
|
val := fmt.Sprintf("(%d, %d, '%s', %f, %d, %d, %t, '%s')", order.OrderID, order.TypeID, issued, order.Price, order.VolumeRemain, order.VolumeTotal, order.IsBuyOrder, runTimestamp)
|
|
values = append(values, val)
|
|
}
|
|
|
|
query := fmt.Sprintf("%s %s", insertParts[0], insertParts[1]) + " " + strings.Join(values, ", ")
|
|
|
|
req, err := http.NewRequest("POST", clickhouseURL, strings.NewReader(query))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "text/plain")
|
|
req.SetBasicAuth(clickhouseUser, clickhousePassword)
|
|
|
|
client := &http.Client{}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("ClickHouse insert failed: %s", body)
|
|
}
|
|
|
|
return nil
|
|
}
|