From ecdb41217e6c197aa9c19d6954abc7744b8c6abc Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Mon, 26 Jan 2026 11:51:29 +0100 Subject: [PATCH] Add item type reader --- mprocs.yaml | 12 ++ pipeline/missing-item-type-reader/main.go | 155 ++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 pipeline/missing-item-type-reader/main.go diff --git a/mprocs.yaml b/mprocs.yaml index b0f4673..50bdce4 100644 --- a/mprocs.yaml +++ b/mprocs.yaml @@ -52,3 +52,15 @@ procs: autostart: false stop: send-keys: [""] + + missing-item-type-reader: + shell: "go run ." + cwd: "pipeline/missing-item-type-reader" + env: + CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" + CLICKHOUSE_DATABASE: "zkill" + CLICKHOUSE_USERNAME: "default" + CLICKHOUSE_PASSWORD: "" + autostart: false + stop: + send-keys: [""] diff --git a/pipeline/missing-item-type-reader/main.go b/pipeline/missing-item-type-reader/main.go new file mode 100644 index 0000000..2c70ba0 --- /dev/null +++ b/pipeline/missing-item-type-reader/main.go @@ -0,0 +1,155 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "zkillsusser/clickhouse" + "zkillsusser/config" + "zkillsusser/lnsq" + + logger "git.site.quack-lab.dev/dave/cylogger" + "github.com/nsqio/go-nsq" +) + +const ( + BatchSize = 1000 + Topic = "missing-item-type-queue" +) + +type ItemTypeIDMessage struct { + ItemTypeID int32 `json:"item_type_id"` +} + +func main() { + if err := config.InitConfig(); err != nil { + logger.Error("Failed to initialize config: %v", err) + return + } + + logger.InitFlag() + logger.Info("Starting missing item type reader: reading from ClickHouse and writing to NSQ") + + chClient, err := clickhouse.NewClient() + if err != nil { + logger.Error("Failed to create ClickHouse client: %v", err) + return + } + defer chClient.Close() + + producer, err := lnsq.NewProducer() + if err != nil { + logger.Error("Failed to create NSQ producer: %v", err) + return + } + defer producer.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigChan + logger.Info("Shutdown signal received, stopping gracefully...") + cancel() + }() + + if err := processMissingItemTypes(ctx, chClient, producer); err != nil { + logger.Error("Failed to process missing item types: %v", err) + return + } + + logger.Info("Missing item type reader completed") +} + +func processMissingItemTypes(ctx context.Context, chClient *clickhouse.Client, producer *nsq.Producer) error { + itemTypeIDs, err := queryMissingItemTypes(ctx, chClient) + if err != nil { + return fmt.Errorf("failed to query missing item types: %w", err) + } + + if len(itemTypeIDs) == 0 { + logger.Info("No missing item types found") + return nil + } + + logger.Info("Found %d distinct missing item type IDs, publishing in batches", len(itemTypeIDs)) + + published := 0 + for i := 0; i < len(itemTypeIDs); i += BatchSize { + if ctx.Err() != nil { + return ctx.Err() + } + + end := i + BatchSize + if end > len(itemTypeIDs) { + end = len(itemTypeIDs) + } + + batch := itemTypeIDs[i:end] + for _, itemTypeID := range batch { + if ctx.Err() != nil { + return ctx.Err() + } + + message := ItemTypeIDMessage{ + ItemTypeID: itemTypeID, + } + + messageBytes, err := json.Marshal(message) + if err != nil { + logger.Error("Failed to marshal item type ID message: %v", err) + continue + } + + for { + err = producer.Publish(Topic, messageBytes) + if err == nil { + break + } + logger.Error("Failed to publish item type ID, retrying: %v", err) + time.Sleep(1 * time.Second) + } + + published++ + } + + logger.Info("Published batch of %d item type IDs (total: %d/%d)", len(batch), published, len(itemTypeIDs)) + } + + logger.Info("Published all %d missing item type IDs", published) + return nil +} + +func queryMissingItemTypes(ctx context.Context, chClient *clickhouse.Client) ([]int32, error) { + query := ` + SELECT DISTINCT item_type_id + FROM zkill.killmail_items + WHERE item_type_name = '' OR item_type_name IS NULL + ORDER BY item_type_id + ` + + rows, err := chClient.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var itemTypeIDs []int32 + for rows.Next() { + var itemTypeID int32 + if err := rows.Scan(&itemTypeID); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + itemTypeIDs = append(itemTypeIDs, itemTypeID) + } + + return itemTypeIDs, rows.Err() +}