diff --git a/lnsq/nsq.go b/lnsq/nsq.go index 352b4e1..076f0e1 100644 --- a/lnsq/nsq.go +++ b/lnsq/nsq.go @@ -1,7 +1,10 @@ package lnsq import ( + "encoding/json" "fmt" + "io" + "net/http" "time" "zkillsusser/config" @@ -74,3 +77,50 @@ func NewProducer() (*nsq.Producer, error) { return producer, nil } + +type TopicStats struct { + TopicName string `json:"topic_name"` + MessageCount int64 `json:"message_count"` + Depth int64 `json:"depth"` +} + +type TopicStatsResponse struct { + Topics []TopicStats `json:"topics"` +} + +func GetTopicDepth(topic string) (int64, error) { + statsURL := fmt.Sprintf("http://%s:%d/stats?format=json&topic=%s", config.NSQHost, config.NSQPort+1, topic) + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(statsURL) + if err != nil { + return 0, fmt.Errorf("failed to get topic stats: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("failed to get topic stats: status %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read stats response: %w", err) + } + + var statsResp struct { + Data struct { + Topics []TopicStats `json:"topics"` + } `json:"data"` + } + if err := json.Unmarshal(body, &statsResp); err != nil { + return 0, fmt.Errorf("failed to unmarshal stats: %w", err) + } + + for _, t := range statsResp.Data.Topics { + if t.TopicName == topic { + return t.Depth, nil + } + } + + return 0, nil +} diff --git a/pipeline/reader/reader.go b/pipeline/reader/reader.go index 61c86f0..cf261f8 100644 --- a/pipeline/reader/reader.go +++ b/pipeline/reader/reader.go @@ -11,6 +11,11 @@ import ( "zkillsusser/lnsq" logger "git.site.quack-lab.dev/dave/cylogger" + "github.com/nsqio/go-nsq" +) + +const ( + PollInterval = 30 * time.Second ) func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error { @@ -32,8 +37,50 @@ func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseCl } defer producer.Stop() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil { + if ctx.Err() != nil { + return nil + } + logger.Error("Failed to process missing IDs on startup: %v", err) + } + + ticker := time.NewTicker(PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.Info("Reader shutting down") + return nil + case <-ticker.C: + if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil { + if ctx.Err() != nil { + return nil + } + logger.Error("Failed to process missing IDs: %v", err) + } + } + } +} + +func pollAndProcess(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error { + depth, err := lnsq.GetTopicDepth(topic) + if err != nil { + return fmt.Errorf("failed to get topic depth: %w", err) + } + + if depth > 0 { + logger.Info("Topic %s has %d messages, skipping query", topic, depth) + return nil + } + + return processMissingIDs(ctx, chClient, producer, topic, getMissing) +} + +func processMissingIDs(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error { ids, err := getMissing(ctx, chClient) if err != nil { return fmt.Errorf("failed to get missing IDs: %w", err)