Add character name readers
This commit is contained in:
167
pipeline/missing-attacker-character-name-reader/main.go
Normal file
167
pipeline/missing-attacker-character-name-reader/main.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"zkillsusser/clickhouse"
|
||||
"zkillsusser/config"
|
||||
"zkillsusser/lnsq"
|
||||
|
||||
logger "git.site.quack-lab.dev/dave/cylogger"
|
||||
"github.com/nsqio/go-nsq"
|
||||
)
|
||||
|
||||
const (
|
||||
BatchSize = 1000
|
||||
Topic = "missing-attacker-character-name-queue"
|
||||
PollInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
type AttackerCharacterIDMessage struct {
|
||||
CharacterID int64 `json:"character_id"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := config.InitConfig(); err != nil {
|
||||
logger.Error("Failed to initialize config: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.InitFlag()
|
||||
logger.Info("Starting missing attacker character name reader: reading from ClickHouse and writing to NSQ")
|
||||
|
||||
chClient, err := clickhouse.NewClient()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create ClickHouse client: %v", err)
|
||||
return
|
||||
}
|
||||
defer chClient.Close()
|
||||
|
||||
producer, err := lnsq.NewProducer()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create NSQ producer: %v", err)
|
||||
return
|
||||
}
|
||||
defer producer.Stop()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
<-sigChan
|
||||
logger.Info("Shutdown signal received, stopping gracefully...")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(PollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("Missing attacker character name reader shutting down")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := processMissingAttackerCharacterNames(ctx, chClient, producer); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
logger.Error("Failed to process missing attacker character names: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processMissingAttackerCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer) error {
|
||||
characterIDs, err := queryMissingAttackerCharacterNames(ctx, chClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query missing attacker character names: %w", err)
|
||||
}
|
||||
|
||||
if len(characterIDs) == 0 {
|
||||
logger.Info("No missing attacker character names found")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Found %d distinct missing attacker character IDs, publishing in batches", len(characterIDs))
|
||||
|
||||
published := 0
|
||||
for i := 0; i < len(characterIDs); i += BatchSize {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
end := i + BatchSize
|
||||
if end > len(characterIDs) {
|
||||
end = len(characterIDs)
|
||||
}
|
||||
|
||||
batch := characterIDs[i:end]
|
||||
for _, characterID := range batch {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
message := AttackerCharacterIDMessage{
|
||||
CharacterID: characterID,
|
||||
}
|
||||
|
||||
messageBytes, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
logger.Error("Failed to marshal attacker character ID message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
err = producer.Publish(Topic, messageBytes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
logger.Error("Failed to publish attacker character ID, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
published++
|
||||
}
|
||||
|
||||
logger.Info("Published batch of %d attacker character IDs (total: %d/%d)", len(batch), published, len(characterIDs))
|
||||
}
|
||||
|
||||
logger.Info("Published all %d missing attacker character IDs", published)
|
||||
return nil
|
||||
}
|
||||
|
||||
func queryMissingAttackerCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient) ([]int64, error) {
|
||||
query := `
|
||||
SELECT DISTINCT character_id
|
||||
FROM zkill.killmail_attackers
|
||||
WHERE character_name = '' OR character_name IS NULL
|
||||
ORDER BY character_id
|
||||
`
|
||||
|
||||
rows, err := chClient.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute query: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var characterIDs []int64
|
||||
for rows.Next() {
|
||||
var characterID int64
|
||||
if err := rows.Scan(&characterID); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan row: %w", err)
|
||||
}
|
||||
characterIDs = append(characterIDs, characterID)
|
||||
}
|
||||
|
||||
return characterIDs, rows.Err()
|
||||
}
|
||||
@@ -18,8 +18,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
BatchSize = 1000
|
||||
Topic = "missing-item-type-queue"
|
||||
BatchSize = 1000
|
||||
Topic = "missing-item-type-queue"
|
||||
PollInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
type ItemTypeIDMessage struct {
|
||||
@@ -61,12 +62,23 @@ func main() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
if err := processMissingItemTypes(ctx, chClient, producer); err != nil {
|
||||
logger.Error("Failed to process missing item types: %v", err)
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(PollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
logger.Info("Missing item type reader completed")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("Missing item type reader shutting down")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := processMissingItemTypes(ctx, chClient, producer); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
logger.Error("Failed to process missing item types: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processMissingItemTypes(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer) error {
|
||||
|
||||
167
pipeline/missing-victim-character-name-reader/main.go
Normal file
167
pipeline/missing-victim-character-name-reader/main.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"zkillsusser/clickhouse"
|
||||
"zkillsusser/config"
|
||||
"zkillsusser/lnsq"
|
||||
|
||||
logger "git.site.quack-lab.dev/dave/cylogger"
|
||||
"github.com/nsqio/go-nsq"
|
||||
)
|
||||
|
||||
const (
|
||||
BatchSize = 1000
|
||||
Topic = "missing-victim-character-name-queue"
|
||||
PollInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
type VictimCharacterIDMessage struct {
|
||||
VictimCharacterID int64 `json:"victim_character_id"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := config.InitConfig(); err != nil {
|
||||
logger.Error("Failed to initialize config: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.InitFlag()
|
||||
logger.Info("Starting missing victim character name reader: reading from ClickHouse and writing to NSQ")
|
||||
|
||||
chClient, err := clickhouse.NewClient()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create ClickHouse client: %v", err)
|
||||
return
|
||||
}
|
||||
defer chClient.Close()
|
||||
|
||||
producer, err := lnsq.NewProducer()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create NSQ producer: %v", err)
|
||||
return
|
||||
}
|
||||
defer producer.Stop()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
<-sigChan
|
||||
logger.Info("Shutdown signal received, stopping gracefully...")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(PollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("Missing victim character name reader shutting down")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := processMissingVictimCharacterNames(ctx, chClient, producer); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
logger.Error("Failed to process missing victim character names: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processMissingVictimCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient, producer *nsq.Producer) error {
|
||||
victimCharacterIDs, err := queryMissingVictimCharacterNames(ctx, chClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query missing victim character names: %w", err)
|
||||
}
|
||||
|
||||
if len(victimCharacterIDs) == 0 {
|
||||
logger.Info("No missing victim character names found")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Found %d distinct missing victim character IDs, publishing in batches", len(victimCharacterIDs))
|
||||
|
||||
published := 0
|
||||
for i := 0; i < len(victimCharacterIDs); i += BatchSize {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
end := i + BatchSize
|
||||
if end > len(victimCharacterIDs) {
|
||||
end = len(victimCharacterIDs)
|
||||
}
|
||||
|
||||
batch := victimCharacterIDs[i:end]
|
||||
for _, victimCharacterID := range batch {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
message := VictimCharacterIDMessage{
|
||||
VictimCharacterID: victimCharacterID,
|
||||
}
|
||||
|
||||
messageBytes, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
logger.Error("Failed to marshal victim character ID message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
err = producer.Publish(Topic, messageBytes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
logger.Error("Failed to publish victim character ID, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
published++
|
||||
}
|
||||
|
||||
logger.Info("Published batch of %d victim character IDs (total: %d/%d)", len(batch), published, len(victimCharacterIDs))
|
||||
}
|
||||
|
||||
logger.Info("Published all %d missing victim character IDs", published)
|
||||
return nil
|
||||
}
|
||||
|
||||
func queryMissingVictimCharacterNames(ctx context.Context, chClient *clickhouse.ClickhouseClient) ([]int64, error) {
|
||||
query := `
|
||||
SELECT DISTINCT victim_character_id
|
||||
FROM zkill.killmails
|
||||
WHERE victim_character_name = '' OR victim_character_name IS NULL
|
||||
ORDER BY victim_character_id
|
||||
`
|
||||
|
||||
rows, err := chClient.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute query: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var victimCharacterIDs []int64
|
||||
for rows.Next() {
|
||||
var victimCharacterID int64
|
||||
if err := rows.Scan(&victimCharacterID); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan row: %w", err)
|
||||
}
|
||||
victimCharacterIDs = append(victimCharacterIDs, victimCharacterID)
|
||||
}
|
||||
|
||||
return victimCharacterIDs, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user