Files
zkill-susser/file_reader.go

122 lines
2.8 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"time"
logger "git.site.quack-lab.dev/dave/cylogger"
utils "git.site.quack-lab.dev/dave/cyutils"
"github.com/nsqio/go-nsq"
)
func runFileReaderStage() {
logger.Info("Starting file reader stage")
killmailFiles, err := os.ReadDir("data")
if err != nil {
logger.Error("Failed to read data directory: %v", err)
return
}
var filesToProcess []string
for _, file := range killmailFiles {
if strings.HasSuffix(file.Name(), ".bz2") {
filesToProcess = append(filesToProcess, filepath.Join("data", file.Name()))
}
}
logger.Info("Found %d files to process", len(filesToProcess))
ctx := context.Background()
utils.WithWorkers(stage1Workers, filesToProcess, func(worker int, index int, filePath string) {
if ctx.Err() != nil {
return
}
fileLog := logger.Default.
WithPrefix(fmt.Sprintf("worker %d", worker)).
WithPrefix(fmt.Sprintf("file %d of %d", index+1, len(filesToProcess))).
WithPrefix(filepath.Base(filePath))
fileLog.Info("Processing file")
config := nsq.NewConfig()
config.WriteTimeout = 10 * time.Second
config.DialTimeout = 5 * time.Second
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), config)
if err != nil {
fileLog.Error("Error creating producer: %v", err)
return
}
defer producer.Stop()
handler := &FileReaderHandler{
producer: producer,
workerID: worker,
}
err = handler.processFile(ctx, filePath)
if err != nil {
fileLog.Error("Failed to process file: %v", err)
return
}
fileLog.Info("Finished processing file")
})
logger.Info("File reader stage completed")
}
type FileReaderHandler struct {
producer *nsq.Producer
workerID int
}
func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) error {
messagelog := logger.Default.WithPrefix(fmt.Sprintf("worker_%d", h.workerID)).WithPrefix(filepath.Base(filePath))
messagelog.Info("Loading killmails from %s", filePath)
killmails, err := LoadBz2Killmails(filePath)
if err != nil {
messagelog.Error("Failed to load killmails: %v", err)
return err
}
messagelog.Info("Loaded %d killmails, publishing to NSQ", len(killmails))
published := 0
for i, killmail := range killmails {
if ctx.Err() != nil {
return ctx.Err()
}
killmailBytes, err := json.Marshal(killmail)
if err != nil {
messagelog.Error("Failed to marshal killmail: %v", err)
continue
}
for {
err = h.producer.Publish("killmail-queue", killmailBytes)
if err == nil {
break
}
messagelog.Error("Failed to publish killmail, retrying: %v", err)
time.Sleep(1 * time.Second)
}
published++
if published%1000 == 0 {
messagelog.Info("Published %d killmails (%d/%d)", published, i+1, len(killmails))
}
}
messagelog.Info("Published %d killmails", published)
return nil
}