107 lines
2.2 KiB
Go
107 lines
2.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
logger "git.site.quack-lab.dev/dave/cylogger"
|
|
"github.com/nsqio/go-nsq"
|
|
)
|
|
|
|
var inserterProcessedCount int64
|
|
|
|
func runInserterStage() {
|
|
logger.Info("Starting inserter stage")
|
|
|
|
db, err := GetDB()
|
|
if err != nil {
|
|
logger.Error("Failed to get database: %v", err)
|
|
return
|
|
}
|
|
|
|
config := nsq.NewConfig()
|
|
config.MaxAttempts = 0
|
|
config.MaxInFlight = stage3Workers
|
|
config.MsgTimeout = 300 * time.Second
|
|
|
|
consumer, err := nsq.NewConsumer("flat-killmail-queue", "inserter", config)
|
|
if err != nil {
|
|
logger.Error("Error creating consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
for i := 0; i < stage3Workers; i++ {
|
|
handler := &InserterHandler{
|
|
db: db,
|
|
workerID: i,
|
|
}
|
|
consumer.AddHandler(handler)
|
|
}
|
|
|
|
err = consumer.ConnectToNSQD(fmt.Sprintf("%s:%d", nsqHost, nsqPort))
|
|
if err != nil {
|
|
logger.Error("Error connecting to NSQ: %v", err)
|
|
return
|
|
}
|
|
|
|
logger.Info("Connected to NSQ at %s:%d", nsqHost, nsqPort)
|
|
|
|
select {} // Block forever, terminate immediately on Ctrl-C
|
|
}
|
|
|
|
type InserterHandler struct {
|
|
db DB
|
|
workerID int
|
|
}
|
|
|
|
func (h *InserterHandler) HandleMessage(message *nsq.Message) error {
|
|
messagelog := logger.Default.WithPrefix(fmt.Sprintf("worker_%d", h.workerID)).WithPrefix(fmt.Sprintf("attempts=%d", message.Attempts))
|
|
|
|
var flatMsg FlatKillmailMessage
|
|
err := json.Unmarshal(message.Body, &flatMsg)
|
|
if err != nil {
|
|
messagelog.Error("Error unmarshalling flattened killmail: %v", err)
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
message.Touch()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
messagelog = messagelog.WithPrefix(fmt.Sprintf("killmail_%d", flatMsg.Killmail.KillmailID))
|
|
|
|
err = h.db.SaveFlatKillmails(
|
|
[]*FlatKillmail{flatMsg.Killmail},
|
|
flatMsg.Attackers,
|
|
flatMsg.Items,
|
|
)
|
|
if err != nil {
|
|
messagelog.Error("Failed to save killmail: %v", err)
|
|
return err
|
|
}
|
|
|
|
count := atomic.AddInt64(&inserterProcessedCount, 1)
|
|
if count%1000 == 0 {
|
|
logger.Info("Inserted %d killmails", count)
|
|
}
|
|
|
|
message.Finish()
|
|
return nil
|
|
}
|