From c0dd32e90eedadd26fbe9b192075a87b2e0072c5 Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Sun, 25 Jan 2026 12:50:33 +0100 Subject: [PATCH] Run nsq locally http is a nightmare... --- .dockerignore | 1 + .gitignore | 1 + config.go | 10 ++--- ...-development.yml => docker-compose-dev.yml | 43 +++++++++---------- file_reader.go | 35 ++++++--------- flattener.go | 43 +++++++------------ inserter.go | 4 +- mprocs.yaml | 12 +++--- 8 files changed, 63 insertions(+), 86 deletions(-) rename docker-compose-development.yml => docker-compose-dev.yml (53%) diff --git a/.dockerignore b/.dockerignore index 5517ac1..0031751 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,6 +6,7 @@ frontend/ # Data directory (runtime data, not needed for build) data/ +nsq_data/ .cache diff --git a/.gitignore b/.gitignore index dc87070..f0992ae 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ dbgate-data clickhouse_data *.exe .cache +nsq_data diff --git a/config.go b/config.go index 46e0592..1a045ce 100644 --- a/config.go +++ b/config.go @@ -51,10 +51,10 @@ var ( func initConfig() error { godotenv.Load() - nsqHost = getEnv("NSQ_HOST", "nsq.site.quack-lab.dev") - nsqPort = getEnvInt("NSQ_PORT", 4151) - nsqLookupHost = getEnv("NSQ_LOOKUP_HOST", "nsqlookup.site.quack-lab.dev") - nsqLookupPort = getEnvInt("NSQ_LOOKUP_PORT", 4161) + nsqHost = getEnv("NSQ_HOST", "localhost") + nsqPort = getEnvInt("NSQ_PORT", 4150) + nsqLookupHost = getEnv("NSQ_LOOKUP_HOST", "localhost") + nsqLookupPort = getEnvInt("NSQ_LOOKUP_PORT", 4160) stage1Workers = getEnvInt("STAGE1_WORKERS", 6) stage2Workers = getEnvInt("STAGE2_WORKERS", 4) @@ -64,7 +64,7 @@ func initConfig() error { serverMode = getEnv("SERVER", "false") == "true" stage = getEnv("STAGE", "") - clickhouseHost = getEnv("CLICKHOUSE_HOST", "clickhouse.site.quack-lab.dev") + clickhouseHost = getEnv("CLICKHOUSE_HOST", "localhost") clickhousePort = getEnvInt("CLICKHOUSE_PORT", 8123) clickhouseDatabase = getEnv("CLICKHOUSE_DATABASE", "zkill") clickhouseUsername = getEnv("CLICKHOUSE_USERNAME", "default") diff --git a/docker-compose-development.yml b/docker-compose-dev.yml similarity index 53% rename from docker-compose-development.yml rename to docker-compose-dev.yml index e584ed3..a243ea6 100644 --- a/docker-compose-development.yml +++ b/docker-compose-dev.yml @@ -1,6 +1,26 @@ version: '3.8' services: + nsqlookupd: + image: nsqio/nsq:latest + command: /nsqlookupd + ports: + - "4160:4160" + - "4161:4161" + volumes: + - ./nsq_data:/data + + nsqd: + image: nsqio/nsq:latest + command: /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160 + ports: + - "4150:4150" + - "4151:4151" + volumes: + - ./nsq_data:/data + depends_on: + - nsqlookupd + clickhouse: image: clickhouse/clickhouse-server:latest ports: @@ -13,26 +33,3 @@ services: - CLICKHOUSE_USER=default - CLICKHOUSE_PASSWORD= - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 - - backend: - build: - context: . - dockerfile: Dockerfile - ports: - - "3000:3000" - volumes: - - ./data:/data - - ./sqlite-latest.sqlite:/sqlite-latest.sqlite - environment: - - PORT=3000 - depends_on: - - clickhouse - - frontend: - build: - context: ./frontend - dockerfile: Dockerfile - ports: - - "8080:8080" - depends_on: - - backend diff --git a/file_reader.go b/file_reader.go index 3001a16..4c74c50 100644 --- a/file_reader.go +++ b/file_reader.go @@ -1,11 +1,9 @@ package main import ( - "bytes" "context" "encoding/json" "fmt" - "net/http" "os" "os/signal" "path/filepath" @@ -15,14 +13,18 @@ import ( logger "git.site.quack-lab.dev/dave/cylogger" utils "git.site.quack-lab.dev/dave/cyutils" + "github.com/nsqio/go-nsq" ) func runFileReaderStage() { logger.Info("Starting file reader stage") - httpClient := &http.Client{ - Timeout: 30 * time.Second, + producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig()) + if err != nil { + logger.Error("Error creating producer: %v", err) + return } + defer producer.Stop() killmailFiles, err := os.ReadDir("data") if err != nil { @@ -63,8 +65,8 @@ func runFileReaderStage() { fileLog.Info("Processing file") handler := &FileReaderHandler{ - httpClient: httpClient, - workerID: worker, + producer: producer, + workerID: worker, } err := handler.processFile(ctx, filePath) @@ -80,8 +82,8 @@ func runFileReaderStage() { } type FileReaderHandler struct { - httpClient *http.Client - workerID int + producer *nsq.Producer + workerID int } func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) error { @@ -108,23 +110,10 @@ func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) er } for { - url := fmt.Sprintf("https://%s/pub?topic=killmail-queue", nsqHost) - req, err := http.NewRequest("POST", url, bytes.NewReader(killmailBytes)) - if err != nil { - messagelog.Error("Failed to create request, retrying: %v", err) - time.Sleep(1 * time.Second) - continue - } - req.Header.Set("Content-Type", "application/json") - - resp, err := h.httpClient.Do(req) - if err == nil && resp.StatusCode == 200 { - resp.Body.Close() + err = h.producer.Publish("killmail-queue", killmailBytes) + if err == nil { break } - if resp != nil { - resp.Body.Close() - } messagelog.Error("Failed to publish killmail, retrying: %v", err) time.Sleep(1 * time.Second) } diff --git a/flattener.go b/flattener.go index e7390fc..1f61338 100644 --- a/flattener.go +++ b/flattener.go @@ -1,11 +1,9 @@ package main import ( - "bytes" "context" "encoding/json" "fmt" - "net/http" "os" "os/signal" "syscall" @@ -28,6 +26,7 @@ func runFlattenerStage() { config.MaxAttempts = 0 config.MaxInFlight = stage2Workers config.MsgTimeout = 300 * time.Second + config.LookupdPollInterval = 15 * time.Second consumer, err := nsq.NewConsumer("killmail-queue", "flattener", config) if err != nil { @@ -35,26 +34,29 @@ func runFlattenerStage() { return } - httpClient := &http.Client{ - Timeout: 30 * time.Second, + producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig()) + if err != nil { + logger.Error("Error creating producer: %v", err) + return } + defer producer.Stop() for i := 0; i < stage2Workers; i++ { handler := &FlattenerHandler{ - db: db, - httpClient: httpClient, - workerID: i, + db: db, + producer: producer, + workerID: i, } consumer.AddHandler(handler) } - err = consumer.ConnectToNSQLookupd(nsqLookupHost) + err = consumer.ConnectToNSQLookupd(fmt.Sprintf("%s:%d", nsqLookupHost, nsqLookupPort)) if err != nil { logger.Error("Error connecting to NSQ lookupd: %v", err) return } - logger.Info("Connected to NSQ lookupd at %s", nsqLookupHost) + logger.Info("Connected to NSQ lookupd at %s:%d", nsqLookupHost, nsqLookupPort) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) @@ -70,9 +72,9 @@ func runFlattenerStage() { } type FlattenerHandler struct { - db DB - httpClient *http.Client - workerID int + db DB + producer *nsq.Producer + workerID int } func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error { @@ -125,23 +127,10 @@ func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error { } for { - url := fmt.Sprintf("https://%s/pub?topic=flat-killmail-queue", nsqHost) - req, err := http.NewRequest("POST", url, bytes.NewReader(flatMsgBytes)) - if err != nil { - messagelog.Error("Failed to create request, retrying: %v", err) - time.Sleep(1 * time.Second) - continue - } - req.Header.Set("Content-Type", "application/json") - - resp, err := h.httpClient.Do(req) - if err == nil && resp.StatusCode == 200 { - resp.Body.Close() + err = h.producer.Publish("flat-killmail-queue", flatMsgBytes) + if err == nil { break } - if resp != nil { - resp.Body.Close() - } messagelog.Error("Failed to publish flattened killmail, retrying: %v", err) time.Sleep(1 * time.Second) } diff --git a/inserter.go b/inserter.go index be48639..9858c56 100644 --- a/inserter.go +++ b/inserter.go @@ -41,13 +41,13 @@ func runInserterStage() { consumer.AddHandler(handler) } - err = consumer.ConnectToNSQLookupd(nsqLookupHost) + err = consumer.ConnectToNSQLookupd(fmt.Sprintf("%s:%d", nsqLookupHost, nsqLookupPort)) if err != nil { logger.Error("Error connecting to NSQ lookupd: %v", err) return } - logger.Info("Connected to NSQ lookupd at %s", nsqLookupHost) + logger.Info("Connected to NSQ lookupd at %s:%d", nsqLookupHost, nsqLookupPort) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) diff --git a/mprocs.yaml b/mprocs.yaml index dfb0012..1c2505d 100644 --- a/mprocs.yaml +++ b/mprocs.yaml @@ -22,13 +22,17 @@ procs: autostart: false stop: send-keys: [""] + + dev-compose: + shell: "docker compose -f docker-compose-dev.yml up" + autostart: true + stop: + send-keys: [""] ingest-stage-file-reader: shell: "go run ." env: STAGE: "file-reader" - NSQ_PORT: "443" - NSQ_LOOKUP_PORT: "443" autostart: false stop: send-keys: [""] @@ -37,8 +41,6 @@ procs: shell: "go run ." env: STAGE: "flattener" - NSQ_PORT: "443" - NSQ_LOOKUP_PORT: "443" autostart: false stop: send-keys: [""] @@ -47,8 +49,6 @@ procs: shell: "go run ." env: STAGE: "inserter" - NSQ_PORT: "443" - NSQ_LOOKUP_PORT: "443" autostart: false stop: send-keys: [""]