diff --git a/config.go b/config.go new file mode 100644 index 0000000..46e0592 --- /dev/null +++ b/config.go @@ -0,0 +1,74 @@ +package main + +import ( + "os" + "strconv" + + "github.com/joho/godotenv" +) + +func getEnv(key, defaultValue string) string { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + return value +} + +func getEnvInt(key string, defaultValue int) int { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + parsed, err := strconv.Atoi(value) + if err != nil { + return defaultValue + } + return parsed +} + +var ( + nsqHost string + nsqPort int + nsqLookupHost string + nsqLookupPort int + + stage1Workers int + stage2Workers int + stage3Workers int + + serverPort string + serverMode bool + stage string + + clickhouseHost string + clickhousePort int + clickhouseDatabase string + clickhouseUsername string + clickhousePassword string +) + +func initConfig() error { + godotenv.Load() + + nsqHost = getEnv("NSQ_HOST", "nsq.site.quack-lab.dev") + nsqPort = getEnvInt("NSQ_PORT", 4151) + nsqLookupHost = getEnv("NSQ_LOOKUP_HOST", "nsqlookup.site.quack-lab.dev") + nsqLookupPort = getEnvInt("NSQ_LOOKUP_PORT", 4161) + + stage1Workers = getEnvInt("STAGE1_WORKERS", 6) + stage2Workers = getEnvInt("STAGE2_WORKERS", 4) + stage3Workers = getEnvInt("STAGE3_WORKERS", 4) + + serverPort = getEnv("SERVER_PORT", "3000") + serverMode = getEnv("SERVER", "false") == "true" + stage = getEnv("STAGE", "") + + clickhouseHost = getEnv("CLICKHOUSE_HOST", "clickhouse.site.quack-lab.dev") + clickhousePort = getEnvInt("CLICKHOUSE_PORT", 8123) + clickhouseDatabase = getEnv("CLICKHOUSE_DATABASE", "zkill") + clickhouseUsername = getEnv("CLICKHOUSE_USERNAME", "default") + clickhousePassword = getEnv("CLICKHOUSE_PASSWORD", "") + + return nil +} diff --git a/db.go b/db.go index 2413bf6..2ff4402 100644 --- a/db.go +++ b/db.go @@ -100,12 +100,13 @@ func GetDBSqlite() (*gorm.DB, error) { } func GetDBClickhouse() (driver.Conn, error) { + addr := fmt.Sprintf("%s:%d", clickhouseHost, clickhousePort) options := &clickhouse.Options{ - Addr: []string{"clickhouse.site.quack-lab.dev"}, + Addr: []string{addr}, Auth: clickhouse.Auth{ - Database: "zkill", - Username: "default", - Password: "", + Database: clickhouseDatabase, + Username: clickhouseUsername, + Password: clickhousePassword, }, Protocol: clickhouse.HTTP, Settings: clickhouse.Settings{ diff --git a/file_reader.go b/file_reader.go index 65b4187..3001a16 100644 --- a/file_reader.go +++ b/file_reader.go @@ -1,28 +1,28 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" + "net/http" "os" "os/signal" "path/filepath" "strings" "syscall" + "time" logger "git.site.quack-lab.dev/dave/cylogger" - "github.com/nsqio/go-nsq" + utils "git.site.quack-lab.dev/dave/cyutils" ) func runFileReaderStage() { logger.Info("Starting file reader stage") - producer, err := nsq.NewProducer(nsqdURL, nsq.NewConfig()) - if err != nil { - logger.Error("Error creating producer: %v", err) - return + httpClient := &http.Client{ + Timeout: 30 * time.Second, } - defer producer.Stop() killmailFiles, err := os.ReadDir("data") if err != nil { @@ -51,36 +51,37 @@ func runFileReaderStage() { cancel() }() - for i, filePath := range filesToProcess { + utils.WithWorkers(stage1Workers, filesToProcess, func(worker int, index int, filePath string) { if ctx.Err() != nil { - break + return } fileLog := logger.Default. - WithPrefix(fmt.Sprintf("file %d of %d", i+1, len(filesToProcess))). + WithPrefix(fmt.Sprintf("worker %d", worker)). + WithPrefix(fmt.Sprintf("file %d of %d", index+1, len(filesToProcess))). WithPrefix(filepath.Base(filePath)) fileLog.Info("Processing file") handler := &FileReaderHandler{ - producer: producer, - workerID: i % stage1Workers, + httpClient: httpClient, + workerID: worker, } err := handler.processFile(ctx, filePath) if err != nil { fileLog.Error("Failed to process file: %v", err) - continue + return } fileLog.Info("Finished processing file") - } + }) logger.Info("File reader stage completed") } type FileReaderHandler struct { - producer *nsq.Producer - workerID int + httpClient *http.Client + workerID int } func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) error { @@ -106,10 +107,26 @@ func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) er continue } - err = h.producer.Publish("killmail-queue", killmailBytes) - if err != nil { - messagelog.Error("Failed to publish killmail: %v", err) - return err + for { + url := fmt.Sprintf("https://%s/pub?topic=killmail-queue", nsqHost) + req, err := http.NewRequest("POST", url, bytes.NewReader(killmailBytes)) + if err != nil { + messagelog.Error("Failed to create request, retrying: %v", err) + time.Sleep(1 * time.Second) + continue + } + req.Header.Set("Content-Type", "application/json") + + resp, err := h.httpClient.Do(req) + if err == nil && resp.StatusCode == 200 { + resp.Body.Close() + break + } + if resp != nil { + resp.Body.Close() + } + messagelog.Error("Failed to publish killmail, retrying: %v", err) + time.Sleep(1 * time.Second) } } diff --git a/flattener.go b/flattener.go index 7586a75..e7390fc 100644 --- a/flattener.go +++ b/flattener.go @@ -1,9 +1,11 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" + "net/http" "os" "os/signal" "syscall" @@ -33,29 +35,26 @@ func runFlattenerStage() { return } - producer, err := nsq.NewProducer(nsqdURL, nsq.NewConfig()) - if err != nil { - logger.Error("Error creating producer: %v", err) - return + httpClient := &http.Client{ + Timeout: 30 * time.Second, } - defer producer.Stop() for i := 0; i < stage2Workers; i++ { handler := &FlattenerHandler{ - db: db, - producer: producer, - workerID: i, + db: db, + httpClient: httpClient, + workerID: i, } consumer.AddHandler(handler) } - err = consumer.ConnectToNSQLookupd(nsqLookupdURL) + err = consumer.ConnectToNSQLookupd(nsqLookupHost) if err != nil { logger.Error("Error connecting to NSQ lookupd: %v", err) return } - logger.Info("Connected to NSQ lookupd at %s", nsqLookupdURL) + logger.Info("Connected to NSQ lookupd at %s", nsqLookupHost) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) @@ -71,9 +70,9 @@ func runFlattenerStage() { } type FlattenerHandler struct { - db DB - producer *nsq.Producer - workerID int + db DB + httpClient *http.Client + workerID int } func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error { @@ -125,10 +124,26 @@ func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error { return err } - err = h.producer.Publish("flat-killmail-queue", flatMsgBytes) - if err != nil { - messagelog.Error("Failed to publish flattened killmail: %v", err) - return err + for { + url := fmt.Sprintf("https://%s/pub?topic=flat-killmail-queue", nsqHost) + req, err := http.NewRequest("POST", url, bytes.NewReader(flatMsgBytes)) + if err != nil { + messagelog.Error("Failed to create request, retrying: %v", err) + time.Sleep(1 * time.Second) + continue + } + req.Header.Set("Content-Type", "application/json") + + resp, err := h.httpClient.Do(req) + if err == nil && resp.StatusCode == 200 { + resp.Body.Close() + break + } + if resp != nil { + resp.Body.Close() + } + messagelog.Error("Failed to publish flattened killmail, retrying: %v", err) + time.Sleep(1 * time.Second) } messagelog.Info("Published flattened killmail with %d attackers and %d items", len(flatAttackers), len(flatItems)) diff --git a/go.mod b/go.mod index c574a00..ef00696 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/hexops/valast v1.5.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/joho/godotenv v1.5.1 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/paulmach/orb v0.12.0 // indirect diff --git a/go.sum b/go.sum index 0c33045..5d1e231 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= diff --git a/inserter.go b/inserter.go index de8786d..be48639 100644 --- a/inserter.go +++ b/inserter.go @@ -41,13 +41,13 @@ func runInserterStage() { consumer.AddHandler(handler) } - err = consumer.ConnectToNSQLookupd(nsqLookupdURL) + err = consumer.ConnectToNSQLookupd(nsqLookupHost) if err != nil { logger.Error("Error connecting to NSQ lookupd: %v", err) return } - logger.Info("Connected to NSQ lookupd at %s", nsqLookupdURL) + logger.Info("Connected to NSQ lookupd at %s", nsqLookupHost) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) diff --git a/main.go b/main.go index 3fdf30c..8ba6117 100644 --- a/main.go +++ b/main.go @@ -1,20 +1,9 @@ package main import ( - "flag" - logger "git.site.quack-lab.dev/dave/cylogger" ) -const ( - nsqLookupdURL = "nsqlookup.site.quack-lab.dev:4161" - nsqdURL = "nsq.site.quack-lab.dev:4150" - - stage1Workers = 6 - stage2Workers = 4 - stage3Workers = 4 -) - type FlatKillmailMessage struct { Killmail *FlatKillmail `json:"killmail"` Attackers []FlatKillmailAttacker `json:"attackers"` @@ -22,25 +11,26 @@ type FlatKillmailMessage struct { } func main() { - server := flag.Bool("server", false, "start API server") - port := flag.String("port", "3000", "API server port") - stage := flag.String("stage", "", "run specific stage: file-reader, flattener, inserter") - flag.Parse() + if err := initConfig(); err != nil { + logger.Error("Failed to initialize config: %v", err) + return + } + logger.InitFlag() logger.Default = logger.Default.ToFile("zkill.log") logger.Info("Starting") - if *server { - StartAPIServer(*port) + if serverMode { + StartAPIServer(serverPort) return } - if *stage != "" { - runStage(*stage) + if stage != "" { + runStage(stage) return } - logger.Error("No action specified. Use -server or -stage") + logger.Error("No action specified. Set SERVER=true or STAGE=file-reader|flattener|inserter") } func runStage(stage string) { diff --git a/mprocs.yaml b/mprocs.yaml index 1bd5a67..dfb0012 100644 --- a/mprocs.yaml +++ b/mprocs.yaml @@ -9,8 +9,10 @@ procs: send-keys: [""] server: - shell: "go run . -server" + shell: "go run ." autostart: false + env: + SERVER: "true" stop: send-keys: [""] @@ -22,19 +24,31 @@ procs: send-keys: [""] ingest-stage-file-reader: - shell: "go run . -stage file-reader" + shell: "go run ." + env: + STAGE: "file-reader" + NSQ_PORT: "443" + NSQ_LOOKUP_PORT: "443" autostart: false stop: send-keys: [""] ingest-stage-flattener: - shell: "go run . -stage flattener" + shell: "go run ." + env: + STAGE: "flattener" + NSQ_PORT: "443" + NSQ_LOOKUP_PORT: "443" autostart: false stop: send-keys: [""] ingest-stage-inserter: - shell: "go run . -stage inserter" + shell: "go run ." + env: + STAGE: "inserter" + NSQ_PORT: "443" + NSQ_LOOKUP_PORT: "443" autostart: false stop: send-keys: [""]