132 lines
3.6 KiB
Go
132 lines
3.6 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"git.site.quack-lab.dev/dave/cyutils"
|
|
)
|
|
|
|
// SaveFlatKillmails saves flattened killmails, attackers, and items to ClickHouse using JSON format
|
|
func (db *DBWrapper) SaveFlatKillmails(
|
|
killmails []*FlatKillmail,
|
|
attackers []FlatKillmailAttacker,
|
|
items []FlatKillmailItem,
|
|
) error {
|
|
ctx := context.Background()
|
|
|
|
// Insert in batches
|
|
if err := db.insertKillmailsJSON(ctx, killmails); err != nil {
|
|
return fmt.Errorf("failed to insert killmails: %w", err)
|
|
}
|
|
if err := db.insertAttackersJSON(ctx, attackers); err != nil {
|
|
return fmt.Errorf("failed to insert attackers: %w", err)
|
|
}
|
|
if err := db.insertItemsJSON(ctx, items); err != nil {
|
|
return fmt.Errorf("failed to insert items: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// insertKillmailsJSON inserts killmails using JSON format
|
|
func (db *DBWrapper) insertKillmailsJSON(ctx context.Context, killmails []*FlatKillmail) error {
|
|
if len(killmails) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var batchErrors []error
|
|
cyutils.Batched(killmails, 1000, func(batch []*FlatKillmail) {
|
|
var jsonRows []string
|
|
for _, km := range batch {
|
|
jsonBytes, err := json.Marshal(km)
|
|
if err != nil {
|
|
batchErrors = append(batchErrors, fmt.Errorf("failed to marshal killmail %d: %w", km.KillmailID, err))
|
|
continue
|
|
}
|
|
jsonRows = append(jsonRows, string(jsonBytes))
|
|
}
|
|
|
|
if len(jsonRows) > 0 {
|
|
jsonData := strings.Join(jsonRows, "\n")
|
|
query := fmt.Sprintf("INSERT INTO zkill.killmails FORMAT JSONEachRow\n%s", jsonData)
|
|
if err := db.ch.Exec(ctx, query); err != nil {
|
|
batchErrors = append(batchErrors, fmt.Errorf("failed to insert killmails batch: %w", err))
|
|
}
|
|
}
|
|
})
|
|
|
|
if len(batchErrors) > 0 {
|
|
return fmt.Errorf("batch errors: %v", batchErrors)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// insertAttackersJSON inserts attackers using JSON format
|
|
func (db *DBWrapper) insertAttackersJSON(ctx context.Context, attackers []FlatKillmailAttacker) error {
|
|
if len(attackers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var batchErrors []error
|
|
cyutils.Batched(attackers, 1000, func(batch []FlatKillmailAttacker) {
|
|
var jsonRows []string
|
|
for _, att := range batch {
|
|
jsonBytes, err := json.Marshal(att)
|
|
if err != nil {
|
|
batchErrors = append(batchErrors, fmt.Errorf("failed to marshal attacker: %w", err))
|
|
continue
|
|
}
|
|
jsonRows = append(jsonRows, string(jsonBytes))
|
|
}
|
|
|
|
if len(jsonRows) > 0 {
|
|
jsonData := strings.Join(jsonRows, "\n")
|
|
query := fmt.Sprintf("INSERT INTO zkill.killmail_attackers FORMAT JSONEachRow\n%s", jsonData)
|
|
if err := db.ch.Exec(ctx, query); err != nil {
|
|
batchErrors = append(batchErrors, fmt.Errorf("failed to insert attackers batch: %w", err))
|
|
}
|
|
}
|
|
})
|
|
|
|
if len(batchErrors) > 0 {
|
|
return fmt.Errorf("batch errors: %v", batchErrors)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// insertItemsJSON inserts items using JSON format
|
|
func (db *DBWrapper) insertItemsJSON(ctx context.Context, items []FlatKillmailItem) error {
|
|
if len(items) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var batchErrors []error
|
|
cyutils.Batched(items, 1000, func(batch []FlatKillmailItem) {
|
|
var jsonRows []string
|
|
for _, item := range batch {
|
|
jsonBytes, err := json.Marshal(item)
|
|
if err != nil {
|
|
batchErrors = append(batchErrors, fmt.Errorf("failed to marshal item: %w", err))
|
|
continue
|
|
}
|
|
jsonRows = append(jsonRows, string(jsonBytes))
|
|
}
|
|
|
|
if len(jsonRows) > 0 {
|
|
jsonData := strings.Join(jsonRows, "\n")
|
|
query := fmt.Sprintf("INSERT INTO zkill.killmail_items FORMAT JSONEachRow\n%s", jsonData)
|
|
if err := db.ch.Exec(ctx, query); err != nil {
|
|
batchErrors = append(batchErrors, fmt.Errorf("failed to insert items batch: %w", err))
|
|
}
|
|
}
|
|
})
|
|
|
|
if len(batchErrors) > 0 {
|
|
return fmt.Errorf("batch errors: %v", batchErrors)
|
|
}
|
|
return nil
|
|
}
|