Files
zkill-susser/pipeline/reader/reader.go
PhatPhuckDave adde43563e Add db polling to readers
So we may let it run over night
2026-01-26 14:31:49 +01:00

122 lines
2.8 KiB
Go

package reader
import (
"context"
"fmt"
"strconv"
"time"
"zkillsusser/clickhouse"
"zkillsusser/config"
"zkillsusser/lnsq"
logger "git.site.quack-lab.dev/dave/cylogger"
"github.com/nsqio/go-nsq"
)
const (
PollInterval = 30 * time.Second
)
func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
if err := config.InitConfig(); err != nil {
return fmt.Errorf("failed to initialize config: %w", err)
}
logger.InitFlag()
chClient, err := clickhouse.NewClient()
if err != nil {
return fmt.Errorf("failed to create ClickHouse client: %w", err)
}
defer chClient.Close()
producer, err := lnsq.NewProducer()
if err != nil {
return fmt.Errorf("failed to create NSQ producer: %w", err)
}
defer producer.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil {
if ctx.Err() != nil {
return nil
}
logger.Error("Failed to process missing IDs on startup: %v", err)
}
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logger.Info("Reader shutting down")
return nil
case <-ticker.C:
if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil {
if ctx.Err() != nil {
return nil
}
logger.Error("Failed to process missing IDs: %v", err)
}
}
}
}
func pollAndProcess(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
depth, err := lnsq.GetTopicDepth(topic)
if err != nil {
return fmt.Errorf("failed to get topic depth: %w", err)
}
if depth > 0 {
logger.Info("Topic %s has %d messages, skipping query", topic, depth)
return nil
}
return processMissingIDs(ctx, chClient, producer, topic, getMissing)
}
func processMissingIDs(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
ids, err := getMissing(ctx, chClient)
if err != nil {
return fmt.Errorf("failed to get missing IDs: %w", err)
}
if len(ids) == 0 {
logger.Info("No missing IDs found")
return nil
}
logger.Info("Found %d distinct missing IDs", len(ids))
published := 0
for _, id := range ids {
if ctx.Err() != nil {
return ctx.Err()
}
messageBytes := []byte(strconv.FormatInt(id, 10))
for {
err := producer.Publish(topic, messageBytes)
if err == nil {
break
}
logger.Error("Failed to publish ID, retrying: %v", err)
time.Sleep(1 * time.Second)
}
published++
if published%1000 == 0 {
logger.Info("Published %d/%d IDs", published, len(ids))
}
}
logger.Info("Published all %d missing IDs", published)
return nil
}