From 6bba290cd45ac74a2678fb3fb86bb274e14ad541 Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Sun, 25 Jan 2026 13:34:54 +0100 Subject: [PATCH] Get rid of broken nsq batching --- config.go | 2 -- file_reader.go | 54 +++++++++++++++++++++----------------------------- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/config.go b/config.go index 11def54..bafad9e 100644 --- a/config.go +++ b/config.go @@ -36,7 +36,6 @@ var ( stage1Workers int stage2Workers int stage3Workers int - nsqBatchSize int serverPort string serverMode bool @@ -60,7 +59,6 @@ func initConfig() error { stage1Workers = getEnvInt("STAGE1_WORKERS", 6) stage2Workers = getEnvInt("STAGE2_WORKERS", 4) stage3Workers = getEnvInt("STAGE3_WORKERS", 4) - nsqBatchSize = getEnvInt("NSQ_BATCH_SIZE", 5000) serverPort = getEnv("SERVER_PORT", "3000") serverMode = getEnv("SERVER", "false") == "true" diff --git a/file_reader.go b/file_reader.go index e1d7a8a..13b81d5 100644 --- a/file_reader.go +++ b/file_reader.go @@ -19,13 +19,6 @@ import ( func runFileReaderStage() { logger.Info("Starting file reader stage") - producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig()) - if err != nil { - logger.Error("Error creating producer: %v", err) - return - } - defer producer.Stop() - killmailFiles, err := os.ReadDir("data") if err != nil { logger.Error("Failed to read data directory: %v", err) @@ -64,12 +57,22 @@ func runFileReaderStage() { WithPrefix(filepath.Base(filePath)) fileLog.Info("Processing file") + config := nsq.NewConfig() + config.WriteTimeout = 10 * time.Second + config.DialTimeout = 5 * time.Second + producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), config) + if err != nil { + fileLog.Error("Error creating producer: %v", err) + return + } + defer producer.Stop() + handler := &FileReaderHandler{ producer: producer, workerID: worker, } - err := handler.processFile(ctx, filePath) + err = handler.processFile(ctx, filePath) if err != nil { fileLog.Error("Failed to process file: %v", err) return @@ -98,44 +101,33 @@ func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) er messagelog.Info("Loaded %d killmails, publishing to NSQ", len(killmails)) - for i := 0; i < len(killmails); i += nsqBatchSize { + published := 0 + for i, killmail := range killmails { if ctx.Err() != nil { return ctx.Err() } - end := i + nsqBatchSize - if end > len(killmails) { - end = len(killmails) - } - - batch := killmails[i:end] - messages := make([][]byte, 0, len(batch)) - - for _, killmail := range batch { - killmailBytes, err := json.Marshal(killmail) - if err != nil { - messagelog.Error("Failed to marshal killmail: %v", err) - continue - } - messages = append(messages, killmailBytes) - } - - if len(messages) == 0 { + killmailBytes, err := json.Marshal(killmail) + if err != nil { + messagelog.Error("Failed to marshal killmail: %v", err) continue } for { - err = h.producer.MultiPublish("killmail-queue", messages) + err = h.producer.Publish("killmail-queue", killmailBytes) if err == nil { break } - messagelog.Error("Failed to publish batch, retrying: %v", err) + messagelog.Error("Failed to publish killmail, retrying: %v", err) time.Sleep(1 * time.Second) } - messagelog.Info("Published batch of %d killmails (%d/%d)", len(messages), end, len(killmails)) + published++ + if published%1000 == 0 { + messagelog.Info("Published %d killmails (%d/%d)", published, i+1, len(killmails)) + } } - messagelog.Info("Published %d killmails", len(killmails)) + messagelog.Info("Published %d killmails", published) return nil }