Run nsq locally
http is a nightmare...
This commit is contained in:
@@ -6,6 +6,7 @@ frontend/
|
||||
|
||||
# Data directory (runtime data, not needed for build)
|
||||
data/
|
||||
nsq_data/
|
||||
|
||||
.cache
|
||||
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -7,3 +7,4 @@ dbgate-data
|
||||
clickhouse_data
|
||||
*.exe
|
||||
.cache
|
||||
nsq_data
|
||||
|
||||
10
config.go
10
config.go
@@ -51,10 +51,10 @@ var (
|
||||
func initConfig() error {
|
||||
godotenv.Load()
|
||||
|
||||
nsqHost = getEnv("NSQ_HOST", "nsq.site.quack-lab.dev")
|
||||
nsqPort = getEnvInt("NSQ_PORT", 4151)
|
||||
nsqLookupHost = getEnv("NSQ_LOOKUP_HOST", "nsqlookup.site.quack-lab.dev")
|
||||
nsqLookupPort = getEnvInt("NSQ_LOOKUP_PORT", 4161)
|
||||
nsqHost = getEnv("NSQ_HOST", "localhost")
|
||||
nsqPort = getEnvInt("NSQ_PORT", 4150)
|
||||
nsqLookupHost = getEnv("NSQ_LOOKUP_HOST", "localhost")
|
||||
nsqLookupPort = getEnvInt("NSQ_LOOKUP_PORT", 4160)
|
||||
|
||||
stage1Workers = getEnvInt("STAGE1_WORKERS", 6)
|
||||
stage2Workers = getEnvInt("STAGE2_WORKERS", 4)
|
||||
@@ -64,7 +64,7 @@ func initConfig() error {
|
||||
serverMode = getEnv("SERVER", "false") == "true"
|
||||
stage = getEnv("STAGE", "")
|
||||
|
||||
clickhouseHost = getEnv("CLICKHOUSE_HOST", "clickhouse.site.quack-lab.dev")
|
||||
clickhouseHost = getEnv("CLICKHOUSE_HOST", "localhost")
|
||||
clickhousePort = getEnvInt("CLICKHOUSE_PORT", 8123)
|
||||
clickhouseDatabase = getEnv("CLICKHOUSE_DATABASE", "zkill")
|
||||
clickhouseUsername = getEnv("CLICKHOUSE_USERNAME", "default")
|
||||
|
||||
@@ -1,6 +1,26 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
nsqlookupd:
|
||||
image: nsqio/nsq:latest
|
||||
command: /nsqlookupd
|
||||
ports:
|
||||
- "4160:4160"
|
||||
- "4161:4161"
|
||||
volumes:
|
||||
- ./nsq_data:/data
|
||||
|
||||
nsqd:
|
||||
image: nsqio/nsq:latest
|
||||
command: /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160
|
||||
ports:
|
||||
- "4150:4150"
|
||||
- "4151:4151"
|
||||
volumes:
|
||||
- ./nsq_data:/data
|
||||
depends_on:
|
||||
- nsqlookupd
|
||||
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server:latest
|
||||
ports:
|
||||
@@ -13,26 +33,3 @@ services:
|
||||
- CLICKHOUSE_USER=default
|
||||
- CLICKHOUSE_PASSWORD=
|
||||
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1
|
||||
|
||||
backend:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
ports:
|
||||
- "3000:3000"
|
||||
volumes:
|
||||
- ./data:/data
|
||||
- ./sqlite-latest.sqlite:/sqlite-latest.sqlite
|
||||
environment:
|
||||
- PORT=3000
|
||||
depends_on:
|
||||
- clickhouse
|
||||
|
||||
frontend:
|
||||
build:
|
||||
context: ./frontend
|
||||
dockerfile: Dockerfile
|
||||
ports:
|
||||
- "8080:8080"
|
||||
depends_on:
|
||||
- backend
|
||||
@@ -1,11 +1,9 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
@@ -15,14 +13,18 @@ import (
|
||||
|
||||
logger "git.site.quack-lab.dev/dave/cylogger"
|
||||
utils "git.site.quack-lab.dev/dave/cyutils"
|
||||
"github.com/nsqio/go-nsq"
|
||||
)
|
||||
|
||||
func runFileReaderStage() {
|
||||
logger.Info("Starting file reader stage")
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig())
|
||||
if err != nil {
|
||||
logger.Error("Error creating producer: %v", err)
|
||||
return
|
||||
}
|
||||
defer producer.Stop()
|
||||
|
||||
killmailFiles, err := os.ReadDir("data")
|
||||
if err != nil {
|
||||
@@ -63,8 +65,8 @@ func runFileReaderStage() {
|
||||
fileLog.Info("Processing file")
|
||||
|
||||
handler := &FileReaderHandler{
|
||||
httpClient: httpClient,
|
||||
workerID: worker,
|
||||
producer: producer,
|
||||
workerID: worker,
|
||||
}
|
||||
|
||||
err := handler.processFile(ctx, filePath)
|
||||
@@ -80,8 +82,8 @@ func runFileReaderStage() {
|
||||
}
|
||||
|
||||
type FileReaderHandler struct {
|
||||
httpClient *http.Client
|
||||
workerID int
|
||||
producer *nsq.Producer
|
||||
workerID int
|
||||
}
|
||||
|
||||
func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) error {
|
||||
@@ -108,23 +110,10 @@ func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) er
|
||||
}
|
||||
|
||||
for {
|
||||
url := fmt.Sprintf("https://%s/pub?topic=killmail-queue", nsqHost)
|
||||
req, err := http.NewRequest("POST", url, bytes.NewReader(killmailBytes))
|
||||
if err != nil {
|
||||
messagelog.Error("Failed to create request, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := h.httpClient.Do(req)
|
||||
if err == nil && resp.StatusCode == 200 {
|
||||
resp.Body.Close()
|
||||
err = h.producer.Publish("killmail-queue", killmailBytes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
messagelog.Error("Failed to publish killmail, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
43
flattener.go
43
flattener.go
@@ -1,11 +1,9 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
@@ -28,6 +26,7 @@ func runFlattenerStage() {
|
||||
config.MaxAttempts = 0
|
||||
config.MaxInFlight = stage2Workers
|
||||
config.MsgTimeout = 300 * time.Second
|
||||
config.LookupdPollInterval = 15 * time.Second
|
||||
|
||||
consumer, err := nsq.NewConsumer("killmail-queue", "flattener", config)
|
||||
if err != nil {
|
||||
@@ -35,26 +34,29 @@ func runFlattenerStage() {
|
||||
return
|
||||
}
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig())
|
||||
if err != nil {
|
||||
logger.Error("Error creating producer: %v", err)
|
||||
return
|
||||
}
|
||||
defer producer.Stop()
|
||||
|
||||
for i := 0; i < stage2Workers; i++ {
|
||||
handler := &FlattenerHandler{
|
||||
db: db,
|
||||
httpClient: httpClient,
|
||||
workerID: i,
|
||||
db: db,
|
||||
producer: producer,
|
||||
workerID: i,
|
||||
}
|
||||
consumer.AddHandler(handler)
|
||||
}
|
||||
|
||||
err = consumer.ConnectToNSQLookupd(nsqLookupHost)
|
||||
err = consumer.ConnectToNSQLookupd(fmt.Sprintf("%s:%d", nsqLookupHost, nsqLookupPort))
|
||||
if err != nil {
|
||||
logger.Error("Error connecting to NSQ lookupd: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Connected to NSQ lookupd at %s", nsqLookupHost)
|
||||
logger.Info("Connected to NSQ lookupd at %s:%d", nsqLookupHost, nsqLookupPort)
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
@@ -70,9 +72,9 @@ func runFlattenerStage() {
|
||||
}
|
||||
|
||||
type FlattenerHandler struct {
|
||||
db DB
|
||||
httpClient *http.Client
|
||||
workerID int
|
||||
db DB
|
||||
producer *nsq.Producer
|
||||
workerID int
|
||||
}
|
||||
|
||||
func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error {
|
||||
@@ -125,23 +127,10 @@ func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error {
|
||||
}
|
||||
|
||||
for {
|
||||
url := fmt.Sprintf("https://%s/pub?topic=flat-killmail-queue", nsqHost)
|
||||
req, err := http.NewRequest("POST", url, bytes.NewReader(flatMsgBytes))
|
||||
if err != nil {
|
||||
messagelog.Error("Failed to create request, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := h.httpClient.Do(req)
|
||||
if err == nil && resp.StatusCode == 200 {
|
||||
resp.Body.Close()
|
||||
err = h.producer.Publish("flat-killmail-queue", flatMsgBytes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
messagelog.Error("Failed to publish flattened killmail, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
@@ -41,13 +41,13 @@ func runInserterStage() {
|
||||
consumer.AddHandler(handler)
|
||||
}
|
||||
|
||||
err = consumer.ConnectToNSQLookupd(nsqLookupHost)
|
||||
err = consumer.ConnectToNSQLookupd(fmt.Sprintf("%s:%d", nsqLookupHost, nsqLookupPort))
|
||||
if err != nil {
|
||||
logger.Error("Error connecting to NSQ lookupd: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Connected to NSQ lookupd at %s", nsqLookupHost)
|
||||
logger.Info("Connected to NSQ lookupd at %s:%d", nsqLookupHost, nsqLookupPort)
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
12
mprocs.yaml
12
mprocs.yaml
@@ -22,13 +22,17 @@ procs:
|
||||
autostart: false
|
||||
stop:
|
||||
send-keys: ["<C-c>"]
|
||||
|
||||
dev-compose:
|
||||
shell: "docker compose -f docker-compose-dev.yml up"
|
||||
autostart: true
|
||||
stop:
|
||||
send-keys: ["<C-c>"]
|
||||
|
||||
ingest-stage-file-reader:
|
||||
shell: "go run ."
|
||||
env:
|
||||
STAGE: "file-reader"
|
||||
NSQ_PORT: "443"
|
||||
NSQ_LOOKUP_PORT: "443"
|
||||
autostart: false
|
||||
stop:
|
||||
send-keys: ["<C-c>"]
|
||||
@@ -37,8 +41,6 @@ procs:
|
||||
shell: "go run ."
|
||||
env:
|
||||
STAGE: "flattener"
|
||||
NSQ_PORT: "443"
|
||||
NSQ_LOOKUP_PORT: "443"
|
||||
autostart: false
|
||||
stop:
|
||||
send-keys: ["<C-c>"]
|
||||
@@ -47,8 +49,6 @@ procs:
|
||||
shell: "go run ."
|
||||
env:
|
||||
STAGE: "inserter"
|
||||
NSQ_PORT: "443"
|
||||
NSQ_LOOKUP_PORT: "443"
|
||||
autostart: false
|
||||
stop:
|
||||
send-keys: ["<C-c>"]
|
||||
|
||||
Reference in New Issue
Block a user