diff --git a/api/api.go b/api/api.go index 435d88b..4c7ba39 100644 --- a/api/api.go +++ b/api/api.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "time" + "zkillsusser/types" logger "git.site.quack-lab.dev/dave/cylogger" ) @@ -26,9 +27,9 @@ type APIAnalyticsRequest struct { } type APIModuleCoOccurrenceRequest struct { - Filters AnalyticsFilters `json:"filters"` - SelectedModuleID int32 `json:"selectedModuleID"` - SelectedSlot string `json:"selectedSlot"` + Filters AnalyticsFilters `json:"filters"` + SelectedModuleID int32 `json:"selectedModuleID"` + SelectedSlot string `json:"selectedSlot"` } type APISearchResult struct { @@ -210,8 +211,8 @@ func handleAnalyticsModuleCoOccurrence(w http.ResponseWriter, r *http.Request) { type APIKillmailIDsRequest struct { Filters types.AnalyticsFilters `json:"filters"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Limit int `json:"limit"` + Offset int `json:"offset"` } func handleAnalyticsKillmails(w http.ResponseWriter, r *http.Request) { diff --git a/config/config.go b/config/config.go index 6307a6d..86e849b 100644 --- a/config/config.go +++ b/config/config.go @@ -33,10 +33,6 @@ var ( NSQLookupHost string NSQLookupPort int - Stage1Workers int - Stage2Workers int - Stage3Workers int - ServerPort string ServerMode bool Stage string @@ -55,10 +51,6 @@ func InitConfig() error { NSQLookupHost = getEnv("NSQ_LOOKUP_HOST", "127.0.0.1") NSQLookupPort = getEnvInt("NSQ_LOOKUP_PORT", 4160) - Stage1Workers = getEnvInt("STAGE1_WORKERS", 24) - Stage2Workers = getEnvInt("STAGE2_WORKERS", 48) - Stage3Workers = getEnvInt("STAGE3_WORKERS", 48) - ServerPort = getEnv("SERVER_PORT", "3000") ServerMode = getEnv("SERVER", "false") == "true" Stage = getEnv("STAGE", "") diff --git a/main.go b/main.go index 5210e19..9bf9d19 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "zkillsusser/api" "zkillsusser/config" - "zkillsusser/pipeline" logger "git.site.quack-lab.dev/dave/cylogger" ) @@ -22,8 +21,4 @@ func main() { return } - if config.Stage == "1" { - pipeline.RunStage1() - return - } } diff --git a/mprocs.yaml b/mprocs.yaml index a735b25..93e730f 100644 --- a/mprocs.yaml +++ b/mprocs.yaml @@ -20,154 +20,30 @@ procs: CLICKHOUSE_PASSWORD: "" stop: send-keys: [""] - + frontend: shell: "bun dev" cwd: "frontend" autostart: false stop: send-keys: [""] - + dev-compose: shell: "docker compose -f docker-compose-dev.yml up" autostart: true stop: send-keys: [""] - ingest-stage-file-reader: + esi-killmail-disk-reader: shell: "go run ." - env: - STAGE: "file-reader" + cwd: "pipeline/esi-killmail-disk-reader" autostart: false stop: send-keys: [""] - ingest-stage-flattener: + esi-killmail-flattener: shell: "go run ." - env: - STAGE: "flattener" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-1: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-2: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-3: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-4: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-5: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-6: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-7: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - ingest-stage-inserter-8: - shell: "go run ." - env: - STAGE: "inserter" - CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev" - CLICKHOUSE_PORT: "80" - CLICKHOUSE_DATABASE: "zkill" - CLICKHOUSE_USERNAME: "default" - CLICKHOUSE_PASSWORD: "" - autostart: false - stop: - send-keys: [""] - - go-tidy: - shell: "go mod tidy" - autostart: false - stop: - send-keys: [""] - - gorm-gentool-install: - shell: "go install gorm.io/gen/tools/gentool@latest" - autostart: false - stop: - send-keys: [""] - - gorm-gentool-generate: - shell: 'gentool -db sqlite -dsn "sqlite-latest.sqlite" -outPath types -modelPkgName models -onlyModel' + cwd: "pipeline/esi-killmail-flattener" autostart: false stop: send-keys: [""] diff --git a/pipeline/esi-killmail-disk-reader.go b/pipeline/esi-killmail-disk-reader/main.go similarity index 83% rename from pipeline/esi-killmail-disk-reader.go rename to pipeline/esi-killmail-disk-reader/main.go index ccc19fd..36dcf77 100644 --- a/pipeline/esi-killmail-disk-reader.go +++ b/pipeline/esi-killmail-disk-reader/main.go @@ -1,4 +1,4 @@ -package pipeline +package main import ( "context" @@ -18,10 +18,21 @@ import ( "github.com/nsqio/go-nsq" ) -func RunStage1() { +const ( + Workers = 4 + DataDir = "data" +) + +func main() { + if err := config.InitConfig(); err != nil { + logger.Error("Failed to initialize config: %v", err) + return + } + + logger.InitFlag() logger.Info("Starting stage 1: reading from disk and writing to NSQ") - killmailFiles, err := os.ReadDir("data") + killmailFiles, err := os.ReadDir(DataDir) if err != nil { logger.Error("Failed to read data directory: %v", err) return @@ -30,7 +41,7 @@ func RunStage1() { var filesToProcess []string for _, file := range killmailFiles { if strings.HasSuffix(file.Name(), ".bz2") { - filesToProcess = append(filesToProcess, filepath.Join("data", file.Name())) + filesToProcess = append(filesToProcess, filepath.Join(DataDir, file.Name())) } } @@ -38,7 +49,7 @@ func RunStage1() { ctx := context.Background() - utils.WithWorkers(config.Stage1Workers, filesToProcess, func(worker int, index int, filePath string) { + utils.WithWorkers(Workers, filesToProcess, func(worker int, index int, filePath string) { if ctx.Err() != nil { return } diff --git a/types/flat_killmail.go b/types/flat_killmail.go index a8cdf7b..04461f0 100644 --- a/types/flat_killmail.go +++ b/types/flat_killmail.go @@ -120,31 +120,3 @@ func derefInt64(i *int64) int64 { } return *i } - -type ModuleFilter struct { - ModuleID int32 -} - -type AnalyticsFilters struct { - KillHour []uint8 - KillDayOfWeek []uint8 - KillDate []string - Month []string - SolarSystemID []int32 - RegionName []string - ConstellationName []string - SecurityStatus []string - VictimShipTypeID []int32 - VictimShipGroupName []string - VictimShipCategory []string - VictimCharacterName []string - VictimCorporation []string - VictimAlliance []string - AttackerShipType []string - AttackerShipGroup []string - AttackerCharacter []string - AttackerCorporation []string - AttackerAlliance []string - SlotType []string - HasModule *ModuleFilter -}