diff --git a/lnsq/nsq.go b/lnsq/nsq.go index 0cfc617..d801572 100644 --- a/lnsq/nsq.go +++ b/lnsq/nsq.go @@ -83,19 +83,26 @@ func NewProducer() (*nsq.Producer, error) { return producer, nil } -type TopicStats struct { - TopicName string `json:"topic_name"` - MessageCount int64 `json:"message_count"` - Depth int64 `json:"depth"` +type ChannelStats struct { + ChannelName string `json:"channel_name"` + Depth int64 `json:"depth"` + InFlight int64 `json:"in_flight_count"` } -type TopicStatsResponse struct { +type TopicStats struct { + TopicName string `json:"topic_name"` + MessageCount int64 `json:"message_count"` + Depth int64 `json:"depth"` + Channels []ChannelStats `json:"channels"` +} + +type StatsResponse struct { Topics []TopicStats `json:"topics"` } func GetTopicDepth(topic string) (int64, error) { statsURL := fmt.Sprintf("http://%s:%d/stats?format=json&topic=%s", config.NSQHost, config.NSQPort+1, topic) - + client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Get(statsURL) if err != nil { @@ -104,7 +111,8 @@ func GetTopicDepth(topic string) (int64, error) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("failed to get topic stats: status %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("failed to get topic stats: status %d, body: %s", resp.StatusCode, string(body)) } body, err := io.ReadAll(resp.Body) @@ -112,18 +120,18 @@ func GetTopicDepth(topic string) (int64, error) { return 0, fmt.Errorf("failed to read stats response: %w", err) } - var statsResp struct { - Data struct { - Topics []TopicStats `json:"topics"` - } `json:"data"` - } + var statsResp StatsResponse if err := json.Unmarshal(body, &statsResp); err != nil { - return 0, fmt.Errorf("failed to unmarshal stats: %w", err) + return 0, fmt.Errorf("failed to unmarshal stats: %w, body: %s", err, string(body)) } - for _, t := range statsResp.Data.Topics { + for _, t := range statsResp.Topics { if t.TopicName == topic { - return t.Depth, nil + totalDepth := t.Depth + for _, ch := range t.Channels { + totalDepth += ch.Depth + ch.InFlight + } + return totalDepth, nil } } diff --git a/pipeline/reader/reader.go b/pipeline/reader/reader.go index cf261f8..5ecb5e0 100644 --- a/pipeline/reader/reader.go +++ b/pipeline/reader/reader.go @@ -15,7 +15,8 @@ import ( ) const ( - PollInterval = 30 * time.Second + PollInterval = 30 * time.Second + LogDepthInterval = 60 * time.Second ) func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error { @@ -40,6 +41,14 @@ func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseCl ctx, cancel := context.WithCancel(context.Background()) defer cancel() + pollTicker := time.NewTicker(PollInterval) + defer pollTicker.Stop() + + depthTicker := time.NewTicker(LogDepthInterval) + defer depthTicker.Stop() + + logTopicDepth(topic) + if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil { if ctx.Err() != nil { return nil @@ -47,15 +56,14 @@ func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseCl logger.Error("Failed to process missing IDs on startup: %v", err) } - ticker := time.NewTicker(PollInterval) - defer ticker.Stop() - for { select { case <-ctx.Done(): logger.Info("Reader shutting down") return nil - case <-ticker.C: + case <-depthTicker.C: + logTopicDepth(topic) + case <-pollTicker.C: if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil { if ctx.Err() != nil { return nil @@ -66,17 +74,30 @@ func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseCl } } +func logTopicDepth(topic string) { + depth, err := lnsq.GetTopicDepth(topic) + if err != nil { + logger.Error("Failed to get topic depth for %s: %v", topic, err) + return + } + logger.Info("Topic %s has %d remaining messages", topic, depth) +} + func pollAndProcess(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error { depth, err := lnsq.GetTopicDepth(topic) if err != nil { - return fmt.Errorf("failed to get topic depth: %w", err) + logger.Error("Failed to get topic depth for %s: %v, skipping query", topic, err) + return nil } + logger.Info("Checking topic %s depth before poll: %d messages", topic, depth) + if depth > 0 { logger.Info("Topic %s has %d messages, skipping query", topic, depth) return nil } + logger.Info("Topic %s is empty, querying for missing IDs", topic) return processMissingIDs(ctx, chClient, producer, topic, getMissing) } @@ -117,5 +138,15 @@ func processMissingIDs(ctx context.Context, chClient *clickhouse.ClickhouseClien } logger.Info("Published all %d missing IDs", published) + + time.Sleep(2 * time.Second) + + finalDepth, err := lnsq.GetTopicDepth(topic) + if err != nil { + logger.Error("Failed to check depth after publishing: %v", err) + } else { + logger.Info("Topic %s depth after publishing: %d messages", topic, finalDepth) + } + return nil } diff --git a/pipeline/resolver/resolver.go b/pipeline/resolver/resolver.go index 192ad0c..fe0d7f9 100644 --- a/pipeline/resolver/resolver.go +++ b/pipeline/resolver/resolver.go @@ -64,7 +64,26 @@ func Run(topic, channel string, nConsumers int, resolveID func(context.Context, logger.Info("All %d consumers connected, consuming from topic: %s, channel: %s", nConsumers, topic, channel) - select {} + depthTicker := time.NewTicker(60 * time.Second) + defer depthTicker.Stop() + + logTopicDepth(topic) + + for { + select { + case <-depthTicker.C: + logTopicDepth(topic) + } + } +} + +func logTopicDepth(topic string) { + depth, err := lnsq.GetTopicDepth(topic) + if err != nil { + logger.Error("Failed to get topic depth for %s: %v", topic, err) + return + } + logger.Info("Topic %s has %d remaining messages", topic, depth) } type NSQHandler struct {