Files
zkill-susser/inserter.go

107 lines
2.2 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"
logger "git.site.quack-lab.dev/dave/cylogger"
"github.com/nsqio/go-nsq"
)
var inserterProcessedCount int64
func runInserterStage() {
logger.Info("Starting inserter stage")
db, err := GetDB()
if err != nil {
logger.Error("Failed to get database: %v", err)
return
}
config := nsq.NewConfig()
config.MaxAttempts = 0
config.MaxInFlight = stage3Workers
config.MsgTimeout = 300 * time.Second
consumer, err := nsq.NewConsumer("flat-killmail-queue", "inserter", config)
if err != nil {
logger.Error("Error creating consumer: %v", err)
return
}
for i := 0; i < stage3Workers; i++ {
handler := &InserterHandler{
db: db,
workerID: i,
}
consumer.AddHandler(handler)
}
err = consumer.ConnectToNSQD(fmt.Sprintf("%s:%d", nsqHost, nsqPort))
if err != nil {
logger.Error("Error connecting to NSQ: %v", err)
return
}
logger.Info("Connected to NSQ at %s:%d", nsqHost, nsqPort)
select {} // Block forever, terminate immediately on Ctrl-C
}
type InserterHandler struct {
db DB
workerID int
}
func (h *InserterHandler) HandleMessage(message *nsq.Message) error {
messagelog := logger.Default.WithPrefix(fmt.Sprintf("worker_%d", h.workerID)).WithPrefix(fmt.Sprintf("attempts=%d", message.Attempts))
var flatMsg FlatKillmailMessage
err := json.Unmarshal(message.Body, &flatMsg)
if err != nil {
messagelog.Error("Error unmarshalling flattened killmail: %v", err)
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
message.Touch()
case <-ctx.Done():
return
}
}
}()
messagelog = messagelog.WithPrefix(fmt.Sprintf("killmail_%d", flatMsg.Killmail.KillmailID))
err = h.db.SaveFlatKillmails(
[]*FlatKillmail{flatMsg.Killmail},
flatMsg.Attackers,
flatMsg.Items,
)
if err != nil {
messagelog.Error("Failed to save killmail: %v", err)
return err
}
count := atomic.AddInt64(&inserterProcessedCount, 1)
if count%1000 == 0 {
logger.Info("Inserted %d killmails", count)
}
message.Finish()
return nil
}