Make pipelines programs on their own

This commit is contained in:
2026-01-26 09:40:44 +01:00
parent e2e1ebe48f
commit 1bc05dc81d
6 changed files with 28 additions and 181 deletions

View File

@@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"time"
"zkillsusser/types"
logger "git.site.quack-lab.dev/dave/cylogger"
)
@@ -26,9 +27,9 @@ type APIAnalyticsRequest struct {
}
type APIModuleCoOccurrenceRequest struct {
Filters AnalyticsFilters `json:"filters"`
SelectedModuleID int32 `json:"selectedModuleID"`
SelectedSlot string `json:"selectedSlot"`
Filters AnalyticsFilters `json:"filters"`
SelectedModuleID int32 `json:"selectedModuleID"`
SelectedSlot string `json:"selectedSlot"`
}
type APISearchResult struct {
@@ -210,8 +211,8 @@ func handleAnalyticsModuleCoOccurrence(w http.ResponseWriter, r *http.Request) {
type APIKillmailIDsRequest struct {
Filters types.AnalyticsFilters `json:"filters"`
Limit int `json:"limit"`
Offset int `json:"offset"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
func handleAnalyticsKillmails(w http.ResponseWriter, r *http.Request) {

View File

@@ -33,10 +33,6 @@ var (
NSQLookupHost string
NSQLookupPort int
Stage1Workers int
Stage2Workers int
Stage3Workers int
ServerPort string
ServerMode bool
Stage string
@@ -55,10 +51,6 @@ func InitConfig() error {
NSQLookupHost = getEnv("NSQ_LOOKUP_HOST", "127.0.0.1")
NSQLookupPort = getEnvInt("NSQ_LOOKUP_PORT", 4160)
Stage1Workers = getEnvInt("STAGE1_WORKERS", 24)
Stage2Workers = getEnvInt("STAGE2_WORKERS", 48)
Stage3Workers = getEnvInt("STAGE3_WORKERS", 48)
ServerPort = getEnv("SERVER_PORT", "3000")
ServerMode = getEnv("SERVER", "false") == "true"
Stage = getEnv("STAGE", "")

View File

@@ -3,7 +3,6 @@ package main
import (
"zkillsusser/api"
"zkillsusser/config"
"zkillsusser/pipeline"
logger "git.site.quack-lab.dev/dave/cylogger"
)
@@ -22,8 +21,4 @@ func main() {
return
}
if config.Stage == "1" {
pipeline.RunStage1()
return
}
}

View File

@@ -20,154 +20,30 @@ procs:
CLICKHOUSE_PASSWORD: ""
stop:
send-keys: ["<C-c>"]
frontend:
shell: "bun dev"
cwd: "frontend"
autostart: false
stop:
send-keys: ["<C-c>"]
dev-compose:
shell: "docker compose -f docker-compose-dev.yml up"
autostart: true
stop:
send-keys: ["<C-c>"]
ingest-stage-file-reader:
esi-killmail-disk-reader:
shell: "go run ."
env:
STAGE: "file-reader"
cwd: "pipeline/esi-killmail-disk-reader"
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-flattener:
esi-killmail-flattener:
shell: "go run ."
env:
STAGE: "flattener"
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-1:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-2:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-3:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-4:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-5:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-6:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-7:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
ingest-stage-inserter-8:
shell: "go run ."
env:
STAGE: "inserter"
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_PORT: "80"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""
autostart: false
stop:
send-keys: ["<C-c>"]
go-tidy:
shell: "go mod tidy"
autostart: false
stop:
send-keys: ["<C-c>"]
gorm-gentool-install:
shell: "go install gorm.io/gen/tools/gentool@latest"
autostart: false
stop:
send-keys: ["<C-c>"]
gorm-gentool-generate:
shell: 'gentool -db sqlite -dsn "sqlite-latest.sqlite" -outPath types -modelPkgName models -onlyModel'
cwd: "pipeline/esi-killmail-flattener"
autostart: false
stop:
send-keys: ["<C-c>"]

View File

@@ -1,4 +1,4 @@
package pipeline
package main
import (
"context"
@@ -18,10 +18,21 @@ import (
"github.com/nsqio/go-nsq"
)
func RunStage1() {
const (
Workers = 4
DataDir = "data"
)
func main() {
if err := config.InitConfig(); err != nil {
logger.Error("Failed to initialize config: %v", err)
return
}
logger.InitFlag()
logger.Info("Starting stage 1: reading from disk and writing to NSQ")
killmailFiles, err := os.ReadDir("data")
killmailFiles, err := os.ReadDir(DataDir)
if err != nil {
logger.Error("Failed to read data directory: %v", err)
return
@@ -30,7 +41,7 @@ func RunStage1() {
var filesToProcess []string
for _, file := range killmailFiles {
if strings.HasSuffix(file.Name(), ".bz2") {
filesToProcess = append(filesToProcess, filepath.Join("data", file.Name()))
filesToProcess = append(filesToProcess, filepath.Join(DataDir, file.Name()))
}
}
@@ -38,7 +49,7 @@ func RunStage1() {
ctx := context.Background()
utils.WithWorkers(config.Stage1Workers, filesToProcess, func(worker int, index int, filePath string) {
utils.WithWorkers(Workers, filesToProcess, func(worker int, index int, filePath string) {
if ctx.Err() != nil {
return
}

View File

@@ -120,31 +120,3 @@ func derefInt64(i *int64) int64 {
}
return *i
}
type ModuleFilter struct {
ModuleID int32
}
type AnalyticsFilters struct {
KillHour []uint8
KillDayOfWeek []uint8
KillDate []string
Month []string
SolarSystemID []int32
RegionName []string
ConstellationName []string
SecurityStatus []string
VictimShipTypeID []int32
VictimShipGroupName []string
VictimShipCategory []string
VictimCharacterName []string
VictimCorporation []string
VictimAlliance []string
AttackerShipType []string
AttackerShipGroup []string
AttackerCharacter []string
AttackerCorporation []string
AttackerAlliance []string
SlotType []string
HasModule *ModuleFilter
}