From 43a49c4dbddeefac12e04729c602978af4aad56a Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Mon, 26 Jan 2026 12:22:28 +0100 Subject: [PATCH] Add character name readers --- .../main.go | 167 ++++++++++++++++++ pipeline/missing-item-type-reader/main.go | 26 ++- .../main.go | 167 ++++++++++++++++++ 3 files changed, 353 insertions(+), 7 deletions(-) create mode 100644 pipeline/missing-attacker-character-name-reader/main.go create mode 100644 pipeline/missing-victim-character-name-reader/main.go diff --git a/pipeline/missing-attacker-character-name-reader/main.go b/pipeline/missing-attacker-character-name-reader/main.go new file mode 100644 index 0000000..133b930 --- /dev/null +++ b/pipeline/missing-attacker-character-name-reader/main.go @@ -0,0 +1,167 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "zkillsusser/clickhouse" + "zkillsusser/config" + "zkillsusser/lnsq" + + logger "git.site.quack-lab.dev/dave/cylogger" + "github.com/nsqio/go-nsq" +) + +const ( + BatchSize = 1000 + Topic = "missing-attacker-character-name-queue" + PollInterval = 30 * time.Second +) + +type AttackerCharacterIDMessage struct { + CharacterID int64 `json:"character_id"` +} + +func main() { + if err := config.InitConfig(); err != nil { + logger.Error("Failed to initialize config: %v", err) + return + } + + logger.InitFlag() + logger.Info("Starting missing attacker character name reader: reading from ClickHouse and writing to NSQ") + + chClient, err := clickhouse.NewClient() + if err != nil { + logger.Error("Failed to create ClickHouse client: %v", err) + return + } + defer chClient.Close() + + producer, err := lnsq.NewProducer() + if err != nil { + logger.Error("Failed to create NSQ producer: %v", err) + return + } + defer producer.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigChan + logger.Info("Shutdown signal received, stopping gracefully...") + cancel() + }() + + ticker := time.NewTicker(PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.Info("Missing attacker character name reader shutting down") + return + case <-ticker.C: + if err := processMissingAttackerCharacterNames(ctx, chClient, producer); err != nil { + if ctx.Err() != nil { + return + } + logger.Error("Failed to process missing attacker character names: %v", err) + } + } + } +} + +func processMissingAttackerCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer) error { + characterIDs, err := queryMissingAttackerCharacterNames(ctx, chClient) + if err != nil { + return fmt.Errorf("failed to query missing attacker character names: %w", err) + } + + if len(characterIDs) == 0 { + logger.Info("No missing attacker character names found") + return nil + } + + logger.Info("Found %d distinct missing attacker character IDs, publishing in batches", len(characterIDs)) + + published := 0 + for i := 0; i < len(characterIDs); i += BatchSize { + if ctx.Err() != nil { + return ctx.Err() + } + + end := i + BatchSize + if end > len(characterIDs) { + end = len(characterIDs) + } + + batch := characterIDs[i:end] + for _, characterID := range batch { + if ctx.Err() != nil { + return ctx.Err() + } + + message := AttackerCharacterIDMessage{ + CharacterID: characterID, + } + + messageBytes, err := json.Marshal(message) + if err != nil { + logger.Error("Failed to marshal attacker character ID message: %v", err) + continue + } + + for { + err = producer.Publish(Topic, messageBytes) + if err == nil { + break + } + logger.Error("Failed to publish attacker character ID, retrying: %v", err) + time.Sleep(1 * time.Second) + } + + published++ + } + + logger.Info("Published batch of %d attacker character IDs (total: %d/%d)", len(batch), published, len(characterIDs)) + } + + logger.Info("Published all %d missing attacker character IDs", published) + return nil +} + +func queryMissingAttackerCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient) ([]int64, error) { + query := ` + SELECT DISTINCT character_id + FROM zkill.killmail_attackers + WHERE character_name = '' OR character_name IS NULL + ORDER BY character_id + ` + + rows, err := chClient.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var characterIDs []int64 + for rows.Next() { + var characterID int64 + if err := rows.Scan(&characterID); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + characterIDs = append(characterIDs, characterID) + } + + return characterIDs, rows.Err() +} diff --git a/pipeline/missing-item-type-reader/main.go b/pipeline/missing-item-type-reader/main.go index db3b9b9..7d27833 100644 --- a/pipeline/missing-item-type-reader/main.go +++ b/pipeline/missing-item-type-reader/main.go @@ -18,8 +18,9 @@ import ( ) const ( - BatchSize = 1000 - Topic = "missing-item-type-queue" + BatchSize = 1000 + Topic = "missing-item-type-queue" + PollInterval = 30 * time.Second ) type ItemTypeIDMessage struct { @@ -61,12 +62,23 @@ func main() { cancel() }() - if err := processMissingItemTypes(ctx, chClient, producer); err != nil { - logger.Error("Failed to process missing item types: %v", err) - return - } + ticker := time.NewTicker(PollInterval) + defer ticker.Stop() - logger.Info("Missing item type reader completed") + for { + select { + case <-ctx.Done(): + logger.Info("Missing item type reader shutting down") + return + case <-ticker.C: + if err := processMissingItemTypes(ctx, chClient, producer); err != nil { + if ctx.Err() != nil { + return + } + logger.Error("Failed to process missing item types: %v", err) + } + } + } } func processMissingItemTypes(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer) error { diff --git a/pipeline/missing-victim-character-name-reader/main.go b/pipeline/missing-victim-character-name-reader/main.go new file mode 100644 index 0000000..b17eaac --- /dev/null +++ b/pipeline/missing-victim-character-name-reader/main.go @@ -0,0 +1,167 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "zkillsusser/clickhouse" + "zkillsusser/config" + "zkillsusser/lnsq" + + logger "git.site.quack-lab.dev/dave/cylogger" + "github.com/nsqio/go-nsq" +) + +const ( + BatchSize = 1000 + Topic = "missing-victim-character-name-queue" + PollInterval = 30 * time.Second +) + +type VictimCharacterIDMessage struct { + VictimCharacterID int64 `json:"victim_character_id"` +} + +func main() { + if err := config.InitConfig(); err != nil { + logger.Error("Failed to initialize config: %v", err) + return + } + + logger.InitFlag() + logger.Info("Starting missing victim character name reader: reading from ClickHouse and writing to NSQ") + + chClient, err := clickhouse.NewClient() + if err != nil { + logger.Error("Failed to create ClickHouse client: %v", err) + return + } + defer chClient.Close() + + producer, err := lnsq.NewProducer() + if err != nil { + logger.Error("Failed to create NSQ producer: %v", err) + return + } + defer producer.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigChan + logger.Info("Shutdown signal received, stopping gracefully...") + cancel() + }() + + ticker := time.NewTicker(PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.Info("Missing victim character name reader shutting down") + return + case <-ticker.C: + if err := processMissingVictimCharacterNames(ctx, chClient, producer); err != nil { + if ctx.Err() != nil { + return + } + logger.Error("Failed to process missing victim character names: %v", err) + } + } + } +} + +func processMissingVictimCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer) error { + victimCharacterIDs, err := queryMissingVictimCharacterNames(ctx, chClient) + if err != nil { + return fmt.Errorf("failed to query missing victim character names: %w", err) + } + + if len(victimCharacterIDs) == 0 { + logger.Info("No missing victim character names found") + return nil + } + + logger.Info("Found %d distinct missing victim character IDs, publishing in batches", len(victimCharacterIDs)) + + published := 0 + for i := 0; i < len(victimCharacterIDs); i += BatchSize { + if ctx.Err() != nil { + return ctx.Err() + } + + end := i + BatchSize + if end > len(victimCharacterIDs) { + end = len(victimCharacterIDs) + } + + batch := victimCharacterIDs[i:end] + for _, victimCharacterID := range batch { + if ctx.Err() != nil { + return ctx.Err() + } + + message := VictimCharacterIDMessage{ + VictimCharacterID: victimCharacterID, + } + + messageBytes, err := json.Marshal(message) + if err != nil { + logger.Error("Failed to marshal victim character ID message: %v", err) + continue + } + + for { + err = producer.Publish(Topic, messageBytes) + if err == nil { + break + } + logger.Error("Failed to publish victim character ID, retrying: %v", err) + time.Sleep(1 * time.Second) + } + + published++ + } + + logger.Info("Published batch of %d victim character IDs (total: %d/%d)", len(batch), published, len(victimCharacterIDs)) + } + + logger.Info("Published all %d missing victim character IDs", published) + return nil +} + +func queryMissingVictimCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient) ([]int64, error) { + query := ` + SELECT DISTINCT victim_character_id + FROM zkill.killmails + WHERE victim_character_name = '' OR victim_character_name IS NULL + ORDER BY victim_character_id + ` + + rows, err := chClient.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var victimCharacterIDs []int64 + for rows.Next() { + var victimCharacterID int64 + if err := rows.Scan(&victimCharacterID); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + victimCharacterIDs = append(victimCharacterIDs, victimCharacterID) + } + + return victimCharacterIDs, rows.Err() +}