Add item type reader
This commit is contained in:
12
mprocs.yaml
12
mprocs.yaml
@@ -52,3 +52,15 @@ procs:
|
||||
autostart: false
|
||||
stop:
|
||||
send-keys: ["<C-c>"]
|
||||
|
||||
missing-item-type-reader:
|
||||
shell: "go run ."
|
||||
cwd: "pipeline/missing-item-type-reader"
|
||||
env:
|
||||
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
|
||||
CLICKHOUSE_DATABASE: "zkill"
|
||||
CLICKHOUSE_USERNAME: "default"
|
||||
CLICKHOUSE_PASSWORD: ""
|
||||
autostart: false
|
||||
stop:
|
||||
send-keys: ["<C-c>"]
|
||||
|
||||
155
pipeline/missing-item-type-reader/main.go
Normal file
155
pipeline/missing-item-type-reader/main.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"zkillsusser/clickhouse"
|
||||
"zkillsusser/config"
|
||||
"zkillsusser/lnsq"
|
||||
|
||||
logger "git.site.quack-lab.dev/dave/cylogger"
|
||||
"github.com/nsqio/go-nsq"
|
||||
)
|
||||
|
||||
const (
|
||||
BatchSize = 1000
|
||||
Topic = "missing-item-type-queue"
|
||||
)
|
||||
|
||||
type ItemTypeIDMessage struct {
|
||||
ItemTypeID int32 `json:"item_type_id"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := config.InitConfig(); err != nil {
|
||||
logger.Error("Failed to initialize config: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.InitFlag()
|
||||
logger.Info("Starting missing item type reader: reading from ClickHouse and writing to NSQ")
|
||||
|
||||
chClient, err := clickhouse.NewClient()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create ClickHouse client: %v", err)
|
||||
return
|
||||
}
|
||||
defer chClient.Close()
|
||||
|
||||
producer, err := lnsq.NewProducer()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create NSQ producer: %v", err)
|
||||
return
|
||||
}
|
||||
defer producer.Stop()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
<-sigChan
|
||||
logger.Info("Shutdown signal received, stopping gracefully...")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
if err := processMissingItemTypes(ctx, chClient, producer); err != nil {
|
||||
logger.Error("Failed to process missing item types: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Missing item type reader completed")
|
||||
}
|
||||
|
||||
func processMissingItemTypes(ctx context.Context, chClient *clickhouse.Client, producer *nsq.Producer) error {
|
||||
itemTypeIDs, err := queryMissingItemTypes(ctx, chClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query missing item types: %w", err)
|
||||
}
|
||||
|
||||
if len(itemTypeIDs) == 0 {
|
||||
logger.Info("No missing item types found")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Found %d distinct missing item type IDs, publishing in batches", len(itemTypeIDs))
|
||||
|
||||
published := 0
|
||||
for i := 0; i < len(itemTypeIDs); i += BatchSize {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
end := i + BatchSize
|
||||
if end > len(itemTypeIDs) {
|
||||
end = len(itemTypeIDs)
|
||||
}
|
||||
|
||||
batch := itemTypeIDs[i:end]
|
||||
for _, itemTypeID := range batch {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
message := ItemTypeIDMessage{
|
||||
ItemTypeID: itemTypeID,
|
||||
}
|
||||
|
||||
messageBytes, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
logger.Error("Failed to marshal item type ID message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
err = producer.Publish(Topic, messageBytes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
logger.Error("Failed to publish item type ID, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
published++
|
||||
}
|
||||
|
||||
logger.Info("Published batch of %d item type IDs (total: %d/%d)", len(batch), published, len(itemTypeIDs))
|
||||
}
|
||||
|
||||
logger.Info("Published all %d missing item type IDs", published)
|
||||
return nil
|
||||
}
|
||||
|
||||
func queryMissingItemTypes(ctx context.Context, chClient *clickhouse.Client) ([]int32, error) {
|
||||
query := `
|
||||
SELECT DISTINCT item_type_id
|
||||
FROM zkill.killmail_items
|
||||
WHERE item_type_name = '' OR item_type_name IS NULL
|
||||
ORDER BY item_type_id
|
||||
`
|
||||
|
||||
rows, err := chClient.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute query: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var itemTypeIDs []int32
|
||||
for rows.Next() {
|
||||
var itemTypeID int32
|
||||
if err := rows.Scan(&itemTypeID); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan row: %w", err)
|
||||
}
|
||||
itemTypeIDs = append(itemTypeIDs, itemTypeID)
|
||||
}
|
||||
|
||||
return itemTypeIDs, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user