143 lines
3.1 KiB
Go
143 lines
3.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
logger "git.site.quack-lab.dev/dave/cylogger"
|
|
"github.com/nsqio/go-nsq"
|
|
)
|
|
|
|
var flattenerProcessedCount int64
|
|
|
|
func runFlattenerStage() {
|
|
logger.Info("Starting flattener stage")
|
|
|
|
db, err := GetDB()
|
|
if err != nil {
|
|
logger.Error("Failed to get database: %v", err)
|
|
return
|
|
}
|
|
|
|
config := nsq.NewConfig()
|
|
config.MaxAttempts = 0
|
|
config.MaxInFlight = stage2Workers
|
|
config.MsgTimeout = 300 * time.Second
|
|
|
|
consumer, err := nsq.NewConsumer("killmail-queue", "flattener", config)
|
|
if err != nil {
|
|
logger.Error("Error creating consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
// Store consumer reference and MaxInFlight for rate limit pausing
|
|
flattenerConsumer = consumer
|
|
flattenerMaxInFlight = stage2Workers
|
|
|
|
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig())
|
|
if err != nil {
|
|
logger.Error("Error creating producer: %v", err)
|
|
return
|
|
}
|
|
defer producer.Stop()
|
|
|
|
// Ping to establish connection
|
|
if err := producer.Ping(); err != nil {
|
|
logger.Error("Failed to ping NSQ producer: %v", err)
|
|
return
|
|
}
|
|
|
|
for i := 0; i < stage2Workers; i++ {
|
|
handler := &FlattenerHandler{
|
|
db: db,
|
|
producer: producer,
|
|
workerID: i,
|
|
}
|
|
consumer.AddHandler(handler)
|
|
}
|
|
|
|
err = consumer.ConnectToNSQD(fmt.Sprintf("%s:%d", nsqHost, nsqPort))
|
|
if err != nil {
|
|
logger.Error("Error connecting to NSQ: %v", err)
|
|
return
|
|
}
|
|
|
|
logger.Info("Connected to NSQ at %s:%d", nsqHost, nsqPort)
|
|
|
|
select {} // Block forever, terminate immediately on Ctrl-C
|
|
}
|
|
|
|
type FlattenerHandler struct {
|
|
db DB
|
|
producer *nsq.Producer
|
|
workerID int
|
|
}
|
|
|
|
func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error {
|
|
messagelog := logger.Default.WithPrefix(fmt.Sprintf("worker_%d", h.workerID)).WithPrefix(fmt.Sprintf("attempts=%d", message.Attempts))
|
|
|
|
var killmail Killmail
|
|
err := json.Unmarshal(message.Body, &killmail)
|
|
if err != nil {
|
|
messagelog.Error("Error unmarshalling killmail: %v", err)
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
message.Touch()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
messagelog = messagelog.WithPrefix(fmt.Sprintf("killmail_%d", killmail.KillmailID))
|
|
|
|
flatKillmail, flatAttackers, flatItems, err := FlattenKillmail(h.db, killmail)
|
|
if err != nil {
|
|
messagelog.Error("Failed to flatten killmail: %v", err)
|
|
return err
|
|
}
|
|
|
|
flatMsg := FlatKillmailMessage{
|
|
Killmail: flatKillmail,
|
|
Attackers: flatAttackers,
|
|
Items: flatItems,
|
|
}
|
|
|
|
flatMsgBytes, err := json.Marshal(flatMsg)
|
|
if err != nil {
|
|
messagelog.Error("Failed to marshal flattened killmail: %v", err)
|
|
return err
|
|
}
|
|
|
|
for {
|
|
err = h.producer.Publish("flat-killmail-queue", flatMsgBytes)
|
|
if err == nil {
|
|
break
|
|
}
|
|
messagelog.Error("Failed to publish flattened killmail, retrying: %v", err)
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
count := atomic.AddInt64(&flattenerProcessedCount, 1)
|
|
if count%1000 == 0 {
|
|
logger.Info("Processed %d killmails", count)
|
|
}
|
|
|
|
message.Finish()
|
|
return nil
|
|
}
|