Add db polling to readers

So we may let it run over night
This commit is contained in:
2026-01-26 14:30:38 +01:00
parent 61d2d14fb8
commit adde43563e
2 changed files with 98 additions and 1 deletions

View File

@@ -1,7 +1,10 @@
package lnsq
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"zkillsusser/config"
@@ -74,3 +77,50 @@ func NewProducer() (*nsq.Producer, error) {
return producer, nil
}
type TopicStats struct {
TopicName string `json:"topic_name"`
MessageCount int64 `json:"message_count"`
Depth int64 `json:"depth"`
}
type TopicStatsResponse struct {
Topics []TopicStats `json:"topics"`
}
func GetTopicDepth(topic string) (int64, error) {
statsURL := fmt.Sprintf("http://%s:%d/stats?format=json&topic=%s", config.NSQHost, config.NSQPort+1, topic)
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(statsURL)
if err != nil {
return 0, fmt.Errorf("failed to get topic stats: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("failed to get topic stats: status %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read stats response: %w", err)
}
var statsResp struct {
Data struct {
Topics []TopicStats `json:"topics"`
} `json:"data"`
}
if err := json.Unmarshal(body, &statsResp); err != nil {
return 0, fmt.Errorf("failed to unmarshal stats: %w", err)
}
for _, t := range statsResp.Data.Topics {
if t.TopicName == topic {
return t.Depth, nil
}
}
return 0, nil
}

View File

@@ -11,6 +11,11 @@ import (
"zkillsusser/lnsq"
logger "git.site.quack-lab.dev/dave/cylogger"
"github.com/nsqio/go-nsq"
)
const (
PollInterval = 30 * time.Second
)
func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
@@ -32,8 +37,50 @@ func Run(topic string, getMissing func(context.Context, *clickhouse.ClickhouseCl
}
defer producer.Stop()
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil {
if ctx.Err() != nil {
return nil
}
logger.Error("Failed to process missing IDs on startup: %v", err)
}
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logger.Info("Reader shutting down")
return nil
case <-ticker.C:
if err := pollAndProcess(ctx, chClient, producer, topic, getMissing); err != nil {
if ctx.Err() != nil {
return nil
}
logger.Error("Failed to process missing IDs: %v", err)
}
}
}
}
func pollAndProcess(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
depth, err := lnsq.GetTopicDepth(topic)
if err != nil {
return fmt.Errorf("failed to get topic depth: %w", err)
}
if depth > 0 {
logger.Info("Topic %s has %d messages, skipping query", topic, depth)
return nil
}
return processMissingIDs(ctx, chClient, producer, topic, getMissing)
}
func processMissingIDs(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer, topic string, getMissing func(context.Context, *clickhouse.ClickhouseClient) ([]int64, error)) error {
ids, err := getMissing(ctx, chClient)
if err != nil {
return fmt.Errorf("failed to get missing IDs: %w", err)