diff --git a/pipeline/missing-attacker-character-name-reader/main.go b/pipeline/missing-attacker-character-name-reader/main.go index 133b930..32df9fa 100644 --- a/pipeline/missing-attacker-character-name-reader/main.go +++ b/pipeline/missing-attacker-character-name-reader/main.go @@ -4,9 +4,6 @@ import ( "context" "encoding/json" "fmt" - "os" - "os/signal" - "syscall" "time" "zkillsusser/clickhouse" @@ -53,15 +50,6 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - <-sigChan - logger.Info("Shutdown signal received, stopping gracefully...") - cancel() - }() - ticker := time.NewTicker(PollInterval) defer ticker.Stop() diff --git a/pipeline/missing-item-type-reader/main.go b/pipeline/missing-item-type-reader/main.go index dcd983f..b1bd522 100644 --- a/pipeline/missing-item-type-reader/main.go +++ b/pipeline/missing-item-type-reader/main.go @@ -3,10 +3,7 @@ package main import ( "context" "fmt" - "os" - "os/signal" "strconv" - "syscall" "time" "zkillsusser/clickhouse" @@ -18,7 +15,7 @@ import ( ) const ( - BatchSize = 1000 + BatchSize = 100 Topic = "missing-item-type-queue" PollInterval = 30 * time.Second ) @@ -38,6 +35,7 @@ func main() { return } defer chClient.Close() + logger.Info("Connected to ClickHouse") producer, err := lnsq.NewProducer() if err != nil { @@ -45,19 +43,11 @@ func main() { return } defer producer.Stop() + logger.Info("Connected to NSQ") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - <-sigChan - logger.Info("Shutdown signal received, stopping gracefully...") - cancel() - }() - ticker := time.NewTicker(PollInterval) defer ticker.Stop() @@ -129,6 +119,7 @@ func processMissingItemTypes(ctx context.Context, chClient *clickhouse.Clickhous } func queryMissingItemTypes(ctx context.Context, chClient *clickhouse.ClickhouseClient) ([]int32, error) { + logger.Info("Querying missing item types") query := ` SELECT DISTINCT item_type_id FROM zkill.killmail_items diff --git a/pipeline/missing-item-type-resolver/main.go b/pipeline/missing-item-type-resolver/main.go index 9e24a26..1ee72f3 100644 --- a/pipeline/missing-item-type-resolver/main.go +++ b/pipeline/missing-item-type-resolver/main.go @@ -3,11 +3,8 @@ package main import ( "context" "fmt" - "os" - "os/signal" "strconv" "strings" - "syscall" "zkillsusser/clickhouse" "zkillsusser/config" @@ -112,11 +109,5 @@ func main() { logger.Info("Connected to NSQ, consuming from topic: %s, channel: %s", Topic, Channel) - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - <-sigChan - logger.Info("Shutdown signal received, stopping gracefully...") - logger.Info("Missing item type resolver completed") } diff --git a/pipeline/missing-victim-character-name-reader/main.go b/pipeline/missing-victim-character-name-reader/main.go index b17eaac..b58e29e 100644 --- a/pipeline/missing-victim-character-name-reader/main.go +++ b/pipeline/missing-victim-character-name-reader/main.go @@ -4,9 +4,6 @@ import ( "context" "encoding/json" "fmt" - "os" - "os/signal" - "syscall" "time" "zkillsusser/clickhouse" @@ -53,15 +50,6 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - <-sigChan - logger.Info("Shutdown signal received, stopping gracefully...") - cancel() - }() - ticker := time.NewTicker(PollInterval) defer ticker.Stop() diff --git a/pipeline/nsq-to-clickhouse-reader/main.go b/pipeline/nsq-to-clickhouse-reader/main.go index b3d424e..4f367ac 100644 --- a/pipeline/nsq-to-clickhouse-reader/main.go +++ b/pipeline/nsq-to-clickhouse-reader/main.go @@ -2,10 +2,7 @@ package main import ( "encoding/json" - "os" - "os/signal" "sync" - "syscall" "time" "zkillsusser/clickhouse" @@ -176,11 +173,5 @@ func main() { logger.Info("Connected to NSQ, consuming from topic: %s, channel: %s", Topic, Channel) - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - <-sigChan - logger.Info("Shutdown signal received, stopping gracefully...") - logger.Info("Stage 2 completed") }