153 lines
3.7 KiB
Go
153 lines
3.7 KiB
Go
package reader
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"zkillsusser/clickhouse"
|
|
"zkillsusser/config"
|
|
"zkillsusser/lnsq"
|
|
|
|
logger "git.site.quack-lab.dev/dave/cylogger"
|
|
"github.com/nsqio/go-nsq"
|
|
)
|
|
|
|
const (
|
|
PollInterval = 30 * time.Second
|
|
LogDepthInterval = 60 * time.Second
|
|
)
|
|
|
|
func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
|
|
if err := config.InitConfig(); err != nil {
|
|
return fmt.Errorf("failed to initialize config: %w", err)
|
|
}
|
|
|
|
logger.InitFlag()
|
|
|
|
chClient, err := clickhouse.NewClient()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create ClickHouse client: %w", err)
|
|
}
|
|
defer chClient.Close()
|
|
|
|
producer, err := lnsq.NewProducer()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create NSQ producer: %w", err)
|
|
}
|
|
defer producer.Stop()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
pollTicker := time.NewTicker(PollInterval)
|
|
defer pollTicker.Stop()
|
|
|
|
depthTicker := time.NewTicker(LogDepthInterval)
|
|
defer depthTicker.Stop()
|
|
|
|
logTopicDepth(topic)
|
|
|
|
if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil {
|
|
if ctx.Err() != nil {
|
|
return nil
|
|
}
|
|
logger.Error("Failed to process missing IDs on startup: %v", err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info("Reader shutting down")
|
|
return nil
|
|
case <-depthTicker.C:
|
|
logTopicDepth(topic)
|
|
case <-pollTicker.C:
|
|
if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil {
|
|
if ctx.Err() != nil {
|
|
return nil
|
|
}
|
|
logger.Error("Failed to process missing IDs: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func logTopicDepth(topic string) {
|
|
depth, err := lnsq.GetTopicDepth(topic)
|
|
if err != nil {
|
|
logger.Error("Failed to get topic depth for %s: %v", topic, err)
|
|
return
|
|
}
|
|
logger.Info("Topic %s has %d remaining messages", topic, depth)
|
|
}
|
|
|
|
func pollAndProcess(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
|
|
depth, err := lnsq.GetTopicDepth(topic)
|
|
if err != nil {
|
|
logger.Error("Failed to get topic depth for %s: %v, skipping query", topic, err)
|
|
return nil
|
|
}
|
|
|
|
logger.Info("Checking topic %s depth before poll: %d messages", topic, depth)
|
|
|
|
if depth > 0 {
|
|
logger.Info("Topic %s has %d messages, skipping query", topic, depth)
|
|
return nil
|
|
}
|
|
|
|
logger.Info("Topic %s is empty, querying for missing IDs", topic)
|
|
return processMissingIDs(ctx, chClient, producer, topic, getMissing)
|
|
}
|
|
|
|
func processMissingIDs(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
|
|
ids, err := getMissing(ctx, chClient)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get missing IDs: %w", err)
|
|
}
|
|
|
|
if len(ids) == 0 {
|
|
logger.Info("No missing IDs found")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("Found %d distinct missing IDs", len(ids))
|
|
|
|
published := 0
|
|
for _, id := range ids {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
messageBytes := []byte(strconv.FormatInt(id, 10))
|
|
|
|
for {
|
|
err := producer.Publish(topic, messageBytes)
|
|
if err == nil {
|
|
break
|
|
}
|
|
logger.Error("Failed to publish ID, retrying: %v", err)
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
published++
|
|
if published%1000 == 0 {
|
|
logger.Info("Published %d/%d IDs", published, len(ids))
|
|
}
|
|
}
|
|
|
|
logger.Info("Published all %d missing IDs", published)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
finalDepth, err := lnsq.GetTopicDepth(topic)
|
|
if err != nil {
|
|
logger.Error("Failed to check depth after publishing: %v", err)
|
|
} else {
|
|
logger.Info("Topic %s depth after publishing: %d messages", topic, finalDepth)
|
|
}
|
|
|
|
return nil
|
|
}
|