Files
zkill-susser/flattener.go

143 lines
3.1 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"
logger "git.site.quack-lab.dev/dave/cylogger"
"github.com/nsqio/go-nsq"
)
var flattenerProcessedCount int64
func runFlattenerStage() {
logger.Info("Starting flattener stage")
db, err := GetDB()
if err != nil {
logger.Error("Failed to get database: %v", err)
return
}
config := nsq.NewConfig()
config.MaxAttempts = 0
config.MaxInFlight = stage2Workers
config.MsgTimeout = 300 * time.Second
consumer, err := nsq.NewConsumer("killmail-queue", "flattener", config)
if err != nil {
logger.Error("Error creating consumer: %v", err)
return
}
// Store consumer reference and MaxInFlight for rate limit pausing
flattenerConsumer = consumer
flattenerMaxInFlight = stage2Workers
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig())
if err != nil {
logger.Error("Error creating producer: %v", err)
return
}
defer producer.Stop()
// Ping to establish connection
if err := producer.Ping(); err != nil {
logger.Error("Failed to ping NSQ producer: %v", err)
return
}
for i := 0; i < stage2Workers; i++ {
handler := &FlattenerHandler{
db: db,
producer: producer,
workerID: i,
}
consumer.AddHandler(handler)
}
err = consumer.ConnectToNSQD(fmt.Sprintf("%s:%d", nsqHost, nsqPort))
if err != nil {
logger.Error("Error connecting to NSQ: %v", err)
return
}
logger.Info("Connected to NSQ at %s:%d", nsqHost, nsqPort)
select {} // Block forever, terminate immediately on Ctrl-C
}
type FlattenerHandler struct {
db DB
producer *nsq.Producer
workerID int
}
func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error {
messagelog := logger.Default.WithPrefix(fmt.Sprintf("worker_%d", h.workerID)).WithPrefix(fmt.Sprintf("attempts=%d", message.Attempts))
var killmail Killmail
err := json.Unmarshal(message.Body, &killmail)
if err != nil {
messagelog.Error("Error unmarshalling killmail: %v", err)
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
message.Touch()
case <-ctx.Done():
return
}
}
}()
messagelog = messagelog.WithPrefix(fmt.Sprintf("killmail_%d", killmail.KillmailID))
flatKillmail, flatAttackers, flatItems, err := FlattenKillmail(h.db, killmail)
if err != nil {
messagelog.Error("Failed to flatten killmail: %v", err)
return err
}
flatMsg := FlatKillmailMessage{
Killmail: flatKillmail,
Attackers: flatAttackers,
Items: flatItems,
}
flatMsgBytes, err := json.Marshal(flatMsg)
if err != nil {
messagelog.Error("Failed to marshal flattened killmail: %v", err)
return err
}
for {
err = h.producer.Publish("flat-killmail-queue", flatMsgBytes)
if err == nil {
break
}
messagelog.Error("Failed to publish flattened killmail, retrying: %v", err)
time.Sleep(1 * time.Second)
}
count := atomic.AddInt64(&flattenerProcessedCount, 1)
if count%1000 == 0 {
logger.Info("Processed %d killmails", count)
}
message.Finish()
return nil
}