Rehallucinate postgres to clickhouse
This commit is contained in:
327
db.go
327
db.go
@@ -1,12 +1,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
logger "git.site.quack-lab.dev/dave/cylogger"
|
||||
"gorm.io/driver/postgres"
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
"gorm.io/gorm/schema"
|
||||
)
|
||||
|
||||
@@ -35,12 +38,13 @@ type DB interface {
|
||||
DB() *gorm.DB
|
||||
Raw(sql string, args ...any) *gorm.DB
|
||||
SaveKillmails(killmails []Killmail) error
|
||||
|
||||
InitTables() error
|
||||
QueryFits(params QueryParams) (*FitStatistics, error)
|
||||
}
|
||||
|
||||
type DBWrapper struct {
|
||||
db *gorm.DB
|
||||
ch driver.Conn
|
||||
gormDB *gorm.DB // For SQLite (EVE static data)
|
||||
}
|
||||
|
||||
var db *DBWrapper
|
||||
@@ -50,33 +54,188 @@ func GetDB() (DB, error) {
|
||||
return db, nil
|
||||
}
|
||||
|
||||
dsn := "host=localhost user=postgres password=postgres dbname=zkill port=5432 sslmode=disable"
|
||||
gdb, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
|
||||
PrepareStmt: true,
|
||||
// ClickHouse connection - connect to default database first
|
||||
options := &clickhouse.Options{
|
||||
Addr: []string{"localhost:9000"},
|
||||
Auth: clickhouse.Auth{
|
||||
Database: "default",
|
||||
Username: "default",
|
||||
Password: "",
|
||||
},
|
||||
}
|
||||
|
||||
conn, err := clickhouse.Open(options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err)
|
||||
}
|
||||
|
||||
// Create database if it doesn't exist
|
||||
ctx := context.Background()
|
||||
if err := conn.Exec(ctx, "CREATE DATABASE IF NOT EXISTS zkill"); err != nil {
|
||||
return nil, fmt.Errorf("failed to create database: %w", err)
|
||||
}
|
||||
|
||||
// Reconnect to the zkill database
|
||||
options.Auth.Database = "zkill"
|
||||
conn, err = clickhouse.Open(options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to zkill database: %w", err)
|
||||
}
|
||||
|
||||
// SQLite connection for EVE static data
|
||||
sqliteDB, err := gorm.Open(sqlite.Open("sqlite-latest.sqlite"), &gorm.Config{
|
||||
NamingStrategy: schema.NamingStrategy{
|
||||
NoLowerCase: true,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to connect to SQLite: %w", err)
|
||||
}
|
||||
|
||||
db = &DBWrapper{
|
||||
ch: conn,
|
||||
gormDB: sqliteDB,
|
||||
}
|
||||
|
||||
db = &DBWrapper{db: gdb}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (db *DBWrapper) InitTables() error {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create flat_killmails table
|
||||
createFlatKillmails := `
|
||||
CREATE TABLE IF NOT EXISTS flat_killmails (
|
||||
killmail_id Int64,
|
||||
killmail_time DateTime,
|
||||
solar_system_id Int64,
|
||||
killmail_hash String,
|
||||
victim_ship_type_id Int64,
|
||||
victim_character_id Int64,
|
||||
victim_corporation_id Int64,
|
||||
victim_alliance_id Int64,
|
||||
victim_damage_taken Int64,
|
||||
victim_pos_x Float64,
|
||||
victim_pos_y Float64,
|
||||
victim_pos_z Float64,
|
||||
attacker_count UInt16,
|
||||
total_damage_done Int64,
|
||||
final_blow_ship_type Int64,
|
||||
attackers Array(Tuple(
|
||||
Int64, -- character_id
|
||||
Int64, -- corporation_id
|
||||
Int64, -- alliance_id
|
||||
Int64, -- ship_type_id
|
||||
Int64, -- weapon_type_id
|
||||
Int64, -- damage_done
|
||||
UInt8, -- final_blow
|
||||
Float64 -- security_status
|
||||
)),
|
||||
items Array(Tuple(
|
||||
Int64, -- flag
|
||||
Int64, -- item_type_id
|
||||
Int64, -- quantity_destroyed
|
||||
Int64, -- quantity_dropped
|
||||
Int64 -- singleton
|
||||
))
|
||||
) ENGINE = MergeTree()
|
||||
ORDER BY (solar_system_id, victim_ship_type_id, killmail_time)`
|
||||
|
||||
if err := db.ch.Exec(ctx, createFlatKillmails); err != nil {
|
||||
return fmt.Errorf("failed to create flat_killmails table: %w", err)
|
||||
}
|
||||
|
||||
// Create fitted_modules table
|
||||
createFittedModules := `
|
||||
CREATE TABLE IF NOT EXISTS fitted_modules (
|
||||
killmail_id Int64,
|
||||
killmail_time DateTime,
|
||||
solar_system_id Int64,
|
||||
victim_ship_type_id Int64,
|
||||
item_type_id Int64,
|
||||
flag Int64,
|
||||
quantity_destroyed Int64,
|
||||
quantity_dropped Int64
|
||||
) ENGINE = MergeTree()
|
||||
ORDER BY (item_type_id, victim_ship_type_id, solar_system_id)`
|
||||
|
||||
if err := db.ch.Exec(ctx, createFittedModules); err != nil {
|
||||
return fmt.Errorf("failed to create fitted_modules table: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DBWrapper) Raw(sql string, args ...any) *gorm.DB {
|
||||
return db.db.Raw(sql, args...)
|
||||
return db.gormDB.Raw(sql, args...)
|
||||
}
|
||||
|
||||
func (db *DBWrapper) DB() *gorm.DB {
|
||||
return db.db
|
||||
return db.gormDB
|
||||
}
|
||||
|
||||
func (db *DBWrapper) SaveKillmails(killmails []Killmail) error {
|
||||
return db.db.Session(&gorm.Session{FullSaveAssociations: true}).
|
||||
Clauses(clause.OnConflict{DoNothing: true}).
|
||||
CreateInBatches(killmails, 10).Error
|
||||
ctx := context.Background()
|
||||
|
||||
// Prepare batch for flat_killmails
|
||||
flatBatch, err := db.ch.PrepareBatch(ctx, "INSERT INTO flat_killmails")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare flat_killmails batch: %w", err)
|
||||
}
|
||||
|
||||
// Prepare batch for fitted_modules
|
||||
moduleBatch, err := db.ch.PrepareBatch(ctx, "INSERT INTO fitted_modules")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare fitted_modules batch: %w", err)
|
||||
}
|
||||
|
||||
// Process in batches
|
||||
batchSize := 1000
|
||||
for i := 0; i < len(killmails); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(killmails) {
|
||||
end = len(killmails)
|
||||
}
|
||||
|
||||
for _, km := range killmails[i:end] {
|
||||
flat := km.FlattenKillmail()
|
||||
modules := km.ExtractFittedModules()
|
||||
|
||||
// Append to flat_killmails batch
|
||||
if err := flatBatch.AppendStruct(flat); err != nil {
|
||||
return fmt.Errorf("failed to append flat killmail: %w", err)
|
||||
}
|
||||
|
||||
// Append modules to fitted_modules batch
|
||||
for _, mod := range modules {
|
||||
if err := moduleBatch.AppendStruct(&mod); err != nil {
|
||||
return fmt.Errorf("failed to append module: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send batches every 1000 records
|
||||
if err := flatBatch.Send(); err != nil {
|
||||
return fmt.Errorf("failed to send flat_killmails batch: %w", err)
|
||||
}
|
||||
if err := moduleBatch.Send(); err != nil {
|
||||
return fmt.Errorf("failed to send fitted_modules batch: %w", err)
|
||||
}
|
||||
|
||||
// Prepare new batches for next iteration
|
||||
if end < len(killmails) {
|
||||
flatBatch, err = db.ch.PrepareBatch(ctx, "INSERT INTO flat_killmails")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare flat_killmails batch: %w", err)
|
||||
}
|
||||
moduleBatch, err = db.ch.PrepareBatch(ctx, "INSERT INTO fitted_modules")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare fitted_modules batch: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) {
|
||||
@@ -86,26 +245,30 @@ func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) {
|
||||
modules := deduplicateInt64(params.Modules)
|
||||
flog.Debug("Deduplicated modules: %d -> %d", len(params.Modules), len(modules))
|
||||
|
||||
query := db.db.Table("public.zkill_killmails").
|
||||
Select("DISTINCT zkill_killmails.killmail_id, zkill_killmails.solar_system_id").
|
||||
Joins("INNER JOIN public.zkill_victims ON zkill_killmails.killmail_id = zkill_victims.killmail_id").
|
||||
Where("zkill_victims.ship_type_id = ?", params.Ship)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Build ClickHouse query
|
||||
query := "SELECT killmail_id, solar_system_id FROM flat_killmails WHERE victim_ship_type_id = ?"
|
||||
args := []interface{}{params.Ship}
|
||||
|
||||
flog.Debug("Checking total killmails for ship type %d", params.Ship)
|
||||
var totalCount int64
|
||||
countResult := db.db.Table("public.zkill_killmails").
|
||||
Joins("INNER JOIN public.zkill_victims ON zkill_killmails.killmail_id = zkill_victims.killmail_id").
|
||||
Where("zkill_victims.ship_type_id = ?", params.Ship).
|
||||
Count(&totalCount)
|
||||
if countResult.Error != nil {
|
||||
flog.Error("Failed to count total killmails: %v", countResult.Error)
|
||||
var totalCount uint64
|
||||
countQuery := "SELECT count() FROM flat_killmails WHERE victim_ship_type_id = ?"
|
||||
if err := db.ch.QueryRow(ctx, countQuery, params.Ship).Scan(&totalCount); err != nil {
|
||||
flog.Error("Failed to count total killmails: %v", err)
|
||||
} else {
|
||||
flog.Info("Total killmails for ship type %d: %d", params.Ship, totalCount)
|
||||
}
|
||||
flog.Debug("Base query: ship_type_id = %d", params.Ship)
|
||||
flog.Debug("Base query: victim_ship_type_id = %d", params.Ship)
|
||||
|
||||
if len(params.Systems) > 0 {
|
||||
query = query.Where("zkill_killmails.solar_system_id IN ?", params.Systems)
|
||||
// Build IN clause with placeholders
|
||||
placeholders := make([]string, len(params.Systems))
|
||||
for i := range params.Systems {
|
||||
placeholders[i] = "?"
|
||||
args = append(args, params.Systems[i])
|
||||
}
|
||||
query += " AND solar_system_id IN (" + fmt.Sprintf("%s", placeholders) + ")"
|
||||
flog.Debug("Added system filter: %d systems", len(params.Systems))
|
||||
}
|
||||
|
||||
@@ -142,27 +305,37 @@ func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
query = query.Where("EXISTS (SELECT 1 FROM public.zkill_items WHERE victim_id = zkill_victims.id AND item_type_id = ? AND flag BETWEEN ? AND ?)",
|
||||
moduleID, flagMin, flagMax)
|
||||
query += " AND killmail_id IN (SELECT killmail_id FROM fitted_modules WHERE item_type_id = ? AND flag BETWEEN ? AND ?)"
|
||||
args = append(args, moduleID, flagMin, flagMax)
|
||||
flog.Debug("Added module filter: module %d in %s slot (flags %d-%d)", moduleID, slot, flagMin, flagMax)
|
||||
}
|
||||
}
|
||||
|
||||
var filteredResults []struct {
|
||||
KillmailID int64 `gorm:"column:killmail_id"`
|
||||
SolarSystemID int64 `gorm:"column:solar_system_id"`
|
||||
var killmailIDs []int64
|
||||
var systemIDs []int64
|
||||
|
||||
flog.Debug("Executing filtered query to get killmail IDs")
|
||||
rows, err := db.ch.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
flog.Error("Failed to execute query: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
flog.Debug("Executing filtered query")
|
||||
result := query.Find(&filteredResults)
|
||||
if result.Error != nil {
|
||||
flog.Error("Failed to find killmails: %v", result.Error)
|
||||
return nil, result.Error
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var killmailID, systemID int64
|
||||
if err := rows.Scan(&killmailID, &systemID); err != nil {
|
||||
flog.Error("Failed to scan row: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
killmailIDs = append(killmailIDs, killmailID)
|
||||
systemIDs = append(systemIDs, systemID)
|
||||
}
|
||||
|
||||
totalKillmails := int64(len(filteredResults))
|
||||
totalKillmails := int64(len(killmailIDs))
|
||||
flog.Info("Found %d killmails after filtering", totalKillmails)
|
||||
if totalKillmails > 0 {
|
||||
flog.Debug("Sample killmail IDs: %v", filteredResults[:min(5, len(filteredResults))])
|
||||
flog.Debug("Sample killmail IDs: %v", killmailIDs[:min(5, len(killmailIDs))])
|
||||
}
|
||||
|
||||
stats := &FitStatistics{
|
||||
@@ -181,19 +354,14 @@ func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) {
|
||||
}
|
||||
|
||||
flog.Debug("Calculating system breakdown")
|
||||
for _, r := range filteredResults {
|
||||
stats.SystemBreakdown[r.SolarSystemID]++
|
||||
for _, systemID := range systemIDs {
|
||||
stats.SystemBreakdown[systemID]++
|
||||
}
|
||||
flog.Debug("System breakdown: %d unique systems", len(stats.SystemBreakdown))
|
||||
|
||||
killmailIDs := make([]int64, len(filteredResults))
|
||||
for i, r := range filteredResults {
|
||||
killmailIDs[i] = r.KillmailID
|
||||
}
|
||||
flog.Debug("Calculating module statistics for %d killmails", len(killmailIDs))
|
||||
|
||||
err := db.calculateModuleStats(killmailIDs, stats, totalKillmails, flog)
|
||||
if err != nil {
|
||||
if err := db.calculateModuleStats(killmailIDs, stats, totalKillmails, flog); err != nil {
|
||||
flog.Error("Failed to calculate module stats: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
@@ -239,9 +407,9 @@ func (db *DBWrapper) getModuleSlots(moduleIDs []int64) (map[int64]string, error)
|
||||
TypeID int32
|
||||
EffectID int32
|
||||
}
|
||||
err := db.db.Table("evesde.dgmTypeEffects").
|
||||
Select("\"typeID\", \"effectID\"").
|
||||
Where("\"typeID\" IN ? AND \"effectID\" IN (11, 12, 13, 2663)", uncached).
|
||||
err := db.gormDB.Table("dgmTypeEffects").
|
||||
Select("typeID, effectID").
|
||||
Where("typeID IN ? AND effectID IN (11, 12, 13, 2663)", uncached).
|
||||
Find(&effects).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -265,11 +433,11 @@ func (db *DBWrapper) getModuleSlots(moduleIDs []int64) (map[int64]string, error)
|
||||
|
||||
droneCategoryID := int32(18)
|
||||
var droneTypeIDs []int32
|
||||
err = db.db.Table("evesde.invTypes").
|
||||
Select("\"invTypes\".\"typeID\"").
|
||||
Joins("INNER JOIN evesde.\"invGroups\" ON \"invTypes\".\"groupID\" = \"invGroups\".\"groupID\"").
|
||||
Where("\"invTypes\".\"typeID\" IN ? AND \"invGroups\".\"categoryID\" = ?", uncached, droneCategoryID).
|
||||
Pluck("\"invTypes\".\"typeID\"", &droneTypeIDs).Error
|
||||
err = db.gormDB.Table("invTypes").
|
||||
Select("invTypes.typeID").
|
||||
Joins("INNER JOIN invGroups ON invTypes.groupID = invGroups.groupID").
|
||||
Where("invTypes.typeID IN ? AND invGroups.categoryID = ?", uncached, droneCategoryID).
|
||||
Pluck("invTypes.typeID", &droneTypeIDs).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -285,29 +453,54 @@ func (db *DBWrapper) getModuleSlots(moduleIDs []int64) (map[int64]string, error)
|
||||
func (db *DBWrapper) calculateModuleStats(killmailIDs []int64, stats *FitStatistics, total int64, flog *logger.Logger) error {
|
||||
flog.Debug("Querying module stats for %d killmails", len(killmailIDs))
|
||||
|
||||
var items []struct {
|
||||
ItemTypeID int32 `gorm:"column:item_type_id"`
|
||||
Flag int64 `gorm:"column:flag"`
|
||||
Count int64 `gorm:"column:count"`
|
||||
if len(killmailIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := db.db.Table("public.zkill_items").
|
||||
Select("zkill_items.item_type_id, zkill_items.flag, COUNT(DISTINCT zkill_victims.killmail_id) as count").
|
||||
Joins("INNER JOIN public.zkill_victims ON zkill_items.victim_id = zkill_victims.id").
|
||||
Where("zkill_victims.killmail_id IN ?", killmailIDs).
|
||||
Group("zkill_items.item_type_id, zkill_items.flag").
|
||||
Find(&items).Error
|
||||
ctx := context.Background()
|
||||
|
||||
// Build IN clause with placeholders
|
||||
placeholders := make([]string, len(killmailIDs))
|
||||
args := make([]interface{}, len(killmailIDs))
|
||||
for i, id := range killmailIDs {
|
||||
placeholders[i] = "?"
|
||||
args[i] = id
|
||||
}
|
||||
query := fmt.Sprintf("SELECT item_type_id, flag, count(DISTINCT killmail_id) as count FROM fitted_modules WHERE killmail_id IN (%s) GROUP BY item_type_id, flag",
|
||||
strings.Join(placeholders, ","))
|
||||
|
||||
rows, err := db.ch.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
flog.Error("Failed to query module stats: %v", err)
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var items []struct {
|
||||
ItemTypeID int32
|
||||
Flag int64
|
||||
Count uint64
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var item struct {
|
||||
ItemTypeID int32
|
||||
Flag int64
|
||||
Count uint64
|
||||
}
|
||||
if err := rows.Scan(&item.ItemTypeID, &item.Flag, &item.Count); err != nil {
|
||||
flog.Error("Failed to scan module stat: %v", err)
|
||||
return err
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
|
||||
flog.Debug("Found %d item type/flag combinations", len(items))
|
||||
|
||||
for _, item := range items {
|
||||
percentage := float64(item.Count) / float64(total) * 100.0
|
||||
moduleStats := ModuleStats{
|
||||
Count: item.Count,
|
||||
Count: int64(item.Count),
|
||||
Percentage: percentage,
|
||||
}
|
||||
|
||||
|
||||
7
main.go
7
main.go
@@ -14,6 +14,8 @@ func main() {
|
||||
ingest := flag.Bool("ingest", false, "ingest killmails from data directory")
|
||||
flag.Parse()
|
||||
logger.InitFlag()
|
||||
logger.Info("Starting")
|
||||
|
||||
db, err := GetDB()
|
||||
if err != nil {
|
||||
logger.Error("Failed to get database: %v", err)
|
||||
@@ -21,11 +23,6 @@ func main() {
|
||||
}
|
||||
|
||||
if *ingest {
|
||||
err = db.DB().AutoMigrate(&Killmail{}, &Attacker{}, &Victim{}, &Item{}, &Position{})
|
||||
if err != nil {
|
||||
logger.Error("Failed to migrate database: %v", err)
|
||||
return
|
||||
}
|
||||
DoIngest()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user