Files
zkill-susser/clickhouse.go

132 lines
3.6 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"git.site.quack-lab.dev/dave/cyutils"
)
// SaveFlatKillmails saves flattened killmails, attackers, and items to ClickHouse using JSON format
func (db *DBWrapper) SaveFlatKillmails(
killmails []*FlatKillmail,
attackers []FlatKillmailAttacker,
items []FlatKillmailItem,
) error {
ctx := context.Background()
// Insert in batches
if err := db.insertKillmailsJSON(ctx, killmails); err != nil {
return fmt.Errorf("failed to insert killmails: %w", err)
}
if err := db.insertAttackersJSON(ctx, attackers); err != nil {
return fmt.Errorf("failed to insert attackers: %w", err)
}
if err := db.insertItemsJSON(ctx, items); err != nil {
return fmt.Errorf("failed to insert items: %w", err)
}
return nil
}
// insertKillmailsJSON inserts killmails using JSON format
func (db *DBWrapper) insertKillmailsJSON(ctx context.Context, killmails []*FlatKillmail) error {
if len(killmails) == 0 {
return nil
}
var batchErrors []error
cyutils.Batched(killmails, 1000, func(batch []*FlatKillmail) {
var jsonRows []string
for _, km := range batch {
jsonBytes, err := json.Marshal(km)
if err != nil {
batchErrors = append(batchErrors, fmt.Errorf("failed to marshal killmail %d: %w", km.KillmailID, err))
continue
}
jsonRows = append(jsonRows, string(jsonBytes))
}
if len(jsonRows) > 0 {
jsonData := strings.Join(jsonRows, "\n")
query := fmt.Sprintf("INSERT INTO zkill.killmails FORMAT JSONEachRow\n%s", jsonData)
if err := db.ch.Exec(ctx, query); err != nil {
batchErrors = append(batchErrors, fmt.Errorf("failed to insert killmails batch: %w", err))
}
}
})
if len(batchErrors) > 0 {
return fmt.Errorf("batch errors: %v", batchErrors)
}
return nil
}
// insertAttackersJSON inserts attackers using JSON format
func (db *DBWrapper) insertAttackersJSON(ctx context.Context, attackers []FlatKillmailAttacker) error {
if len(attackers) == 0 {
return nil
}
var batchErrors []error
cyutils.Batched(attackers, 1000, func(batch []FlatKillmailAttacker) {
var jsonRows []string
for _, att := range batch {
jsonBytes, err := json.Marshal(att)
if err != nil {
batchErrors = append(batchErrors, fmt.Errorf("failed to marshal attacker: %w", err))
continue
}
jsonRows = append(jsonRows, string(jsonBytes))
}
if len(jsonRows) > 0 {
jsonData := strings.Join(jsonRows, "\n")
query := fmt.Sprintf("INSERT INTO zkill.killmail_attackers FORMAT JSONEachRow\n%s", jsonData)
if err := db.ch.Exec(ctx, query); err != nil {
batchErrors = append(batchErrors, fmt.Errorf("failed to insert attackers batch: %w", err))
}
}
})
if len(batchErrors) > 0 {
return fmt.Errorf("batch errors: %v", batchErrors)
}
return nil
}
// insertItemsJSON inserts items using JSON format
func (db *DBWrapper) insertItemsJSON(ctx context.Context, items []FlatKillmailItem) error {
if len(items) == 0 {
return nil
}
var batchErrors []error
cyutils.Batched(items, 1000, func(batch []FlatKillmailItem) {
var jsonRows []string
for _, item := range batch {
jsonBytes, err := json.Marshal(item)
if err != nil {
batchErrors = append(batchErrors, fmt.Errorf("failed to marshal item: %w", err))
continue
}
jsonRows = append(jsonRows, string(jsonBytes))
}
if len(jsonRows) > 0 {
jsonData := strings.Join(jsonRows, "\n")
query := fmt.Sprintf("INSERT INTO zkill.killmail_items FORMAT JSONEachRow\n%s", jsonData)
if err := db.ch.Exec(ctx, query); err != nil {
batchErrors = append(batchErrors, fmt.Errorf("failed to insert items batch: %w", err))
}
}
})
if len(batchErrors) > 0 {
return fmt.Errorf("batch errors: %v", batchErrors)
}
return nil
}