package main import ( "context" "encoding/json" "fmt" "os" "path/filepath" "strings" "time" logger "git.site.quack-lab.dev/dave/cylogger" utils "git.site.quack-lab.dev/dave/cyutils" "github.com/nsqio/go-nsq" ) func runFileReaderStage() { logger.Info("Starting file reader stage") killmailFiles, err := os.ReadDir("data") if err != nil { logger.Error("Failed to read data directory: %v", err) return } var filesToProcess []string for _, file := range killmailFiles { if strings.HasSuffix(file.Name(), ".bz2") { filesToProcess = append(filesToProcess, filepath.Join("data", file.Name())) } } logger.Info("Found %d files to process", len(filesToProcess)) ctx := context.Background() utils.WithWorkers(stage1Workers, filesToProcess, func(worker int, index int, filePath string) { if ctx.Err() != nil { return } fileLog := logger.Default. WithPrefix(fmt.Sprintf("worker %d", worker)). WithPrefix(fmt.Sprintf("file %d of %d", index+1, len(filesToProcess))). WithPrefix(filepath.Base(filePath)) fileLog.Info("Processing file") config := nsq.NewConfig() config.WriteTimeout = 10 * time.Second config.DialTimeout = 5 * time.Second producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), config) if err != nil { fileLog.Error("Error creating producer: %v", err) return } defer producer.Stop() handler := &FileReaderHandler{ producer: producer, workerID: worker, } err = handler.processFile(ctx, filePath) if err != nil { fileLog.Error("Failed to process file: %v", err) return } fileLog.Info("Finished processing file") }) logger.Info("File reader stage completed") } type FileReaderHandler struct { producer *nsq.Producer workerID int } func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) error { messagelog := logger.Default.WithPrefix(fmt.Sprintf("worker_%d", h.workerID)).WithPrefix(filepath.Base(filePath)) messagelog.Info("Loading killmails from %s", filePath) killmails, err := LoadBz2Killmails(filePath) if err != nil { messagelog.Error("Failed to load killmails: %v", err) return err } messagelog.Info("Loaded %d killmails, publishing to NSQ", len(killmails)) published := 0 for i, killmail := range killmails { if ctx.Err() != nil { return ctx.Err() } killmailBytes, err := json.Marshal(killmail) if err != nil { messagelog.Error("Failed to marshal killmail: %v", err) continue } for { err = h.producer.Publish("killmail-queue", killmailBytes) if err == nil { break } messagelog.Error("Failed to publish killmail, retrying: %v", err) time.Sleep(1 * time.Second) } published++ if published%1000 == 0 { messagelog.Info("Published %d killmails (%d/%d)", published, i+1, len(killmails)) } } messagelog.Info("Published %d killmails", published) return nil }