diff --git a/db.go b/db.go index 5cfabe9..4efb285 100644 --- a/db.go +++ b/db.go @@ -1,12 +1,15 @@ package main import ( + "context" "fmt" + "strings" logger "git.site.quack-lab.dev/dave/cylogger" - "gorm.io/driver/postgres" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "gorm.io/driver/sqlite" "gorm.io/gorm" - "gorm.io/gorm/clause" "gorm.io/gorm/schema" ) @@ -35,12 +38,13 @@ type DB interface { DB() *gorm.DB Raw(sql string, args ...any) *gorm.DB SaveKillmails(killmails []Killmail) error - + InitTables() error QueryFits(params QueryParams) (*FitStatistics, error) } type DBWrapper struct { - db *gorm.DB + ch driver.Conn + gormDB *gorm.DB // For SQLite (EVE static data) } var db *DBWrapper @@ -50,33 +54,188 @@ func GetDB() (DB, error) { return db, nil } - dsn := "host=localhost user=postgres password=postgres dbname=zkill port=5432 sslmode=disable" - gdb, err := gorm.Open(postgres.Open(dsn), &gorm.Config{ - PrepareStmt: true, + // ClickHouse connection - connect to default database first + options := &clickhouse.Options{ + Addr: []string{"localhost:9000"}, + Auth: clickhouse.Auth{ + Database: "default", + Username: "default", + Password: "", + }, + } + + conn, err := clickhouse.Open(options) + if err != nil { + return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err) + } + + // Create database if it doesn't exist + ctx := context.Background() + if err := conn.Exec(ctx, "CREATE DATABASE IF NOT EXISTS zkill"); err != nil { + return nil, fmt.Errorf("failed to create database: %w", err) + } + + // Reconnect to the zkill database + options.Auth.Database = "zkill" + conn, err = clickhouse.Open(options) + if err != nil { + return nil, fmt.Errorf("failed to connect to zkill database: %w", err) + } + + // SQLite connection for EVE static data + sqliteDB, err := gorm.Open(sqlite.Open("sqlite-latest.sqlite"), &gorm.Config{ NamingStrategy: schema.NamingStrategy{ NoLowerCase: true, }, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to connect to SQLite: %w", err) + } + + db = &DBWrapper{ + ch: conn, + gormDB: sqliteDB, } - db = &DBWrapper{db: gdb} return db, nil } +func (db *DBWrapper) InitTables() error { + ctx := context.Background() + + // Create flat_killmails table + createFlatKillmails := ` + CREATE TABLE IF NOT EXISTS flat_killmails ( + killmail_id Int64, + killmail_time DateTime, + solar_system_id Int64, + killmail_hash String, + victim_ship_type_id Int64, + victim_character_id Int64, + victim_corporation_id Int64, + victim_alliance_id Int64, + victim_damage_taken Int64, + victim_pos_x Float64, + victim_pos_y Float64, + victim_pos_z Float64, + attacker_count UInt16, + total_damage_done Int64, + final_blow_ship_type Int64, + attackers Array(Tuple( + Int64, -- character_id + Int64, -- corporation_id + Int64, -- alliance_id + Int64, -- ship_type_id + Int64, -- weapon_type_id + Int64, -- damage_done + UInt8, -- final_blow + Float64 -- security_status + )), + items Array(Tuple( + Int64, -- flag + Int64, -- item_type_id + Int64, -- quantity_destroyed + Int64, -- quantity_dropped + Int64 -- singleton + )) + ) ENGINE = MergeTree() + ORDER BY (solar_system_id, victim_ship_type_id, killmail_time)` + + if err := db.ch.Exec(ctx, createFlatKillmails); err != nil { + return fmt.Errorf("failed to create flat_killmails table: %w", err) + } + + // Create fitted_modules table + createFittedModules := ` + CREATE TABLE IF NOT EXISTS fitted_modules ( + killmail_id Int64, + killmail_time DateTime, + solar_system_id Int64, + victim_ship_type_id Int64, + item_type_id Int64, + flag Int64, + quantity_destroyed Int64, + quantity_dropped Int64 + ) ENGINE = MergeTree() + ORDER BY (item_type_id, victim_ship_type_id, solar_system_id)` + + if err := db.ch.Exec(ctx, createFittedModules); err != nil { + return fmt.Errorf("failed to create fitted_modules table: %w", err) + } + + return nil +} + func (db *DBWrapper) Raw(sql string, args ...any) *gorm.DB { - return db.db.Raw(sql, args...) + return db.gormDB.Raw(sql, args...) } func (db *DBWrapper) DB() *gorm.DB { - return db.db + return db.gormDB } func (db *DBWrapper) SaveKillmails(killmails []Killmail) error { - return db.db.Session(&gorm.Session{FullSaveAssociations: true}). - Clauses(clause.OnConflict{DoNothing: true}). - CreateInBatches(killmails, 10).Error + ctx := context.Background() + + // Prepare batch for flat_killmails + flatBatch, err := db.ch.PrepareBatch(ctx, "INSERT INTO flat_killmails") + if err != nil { + return fmt.Errorf("failed to prepare flat_killmails batch: %w", err) + } + + // Prepare batch for fitted_modules + moduleBatch, err := db.ch.PrepareBatch(ctx, "INSERT INTO fitted_modules") + if err != nil { + return fmt.Errorf("failed to prepare fitted_modules batch: %w", err) + } + + // Process in batches + batchSize := 1000 + for i := 0; i < len(killmails); i += batchSize { + end := i + batchSize + if end > len(killmails) { + end = len(killmails) + } + + for _, km := range killmails[i:end] { + flat := km.FlattenKillmail() + modules := km.ExtractFittedModules() + + // Append to flat_killmails batch + if err := flatBatch.AppendStruct(flat); err != nil { + return fmt.Errorf("failed to append flat killmail: %w", err) + } + + // Append modules to fitted_modules batch + for _, mod := range modules { + if err := moduleBatch.AppendStruct(&mod); err != nil { + return fmt.Errorf("failed to append module: %w", err) + } + } + } + + // Send batches every 1000 records + if err := flatBatch.Send(); err != nil { + return fmt.Errorf("failed to send flat_killmails batch: %w", err) + } + if err := moduleBatch.Send(); err != nil { + return fmt.Errorf("failed to send fitted_modules batch: %w", err) + } + + // Prepare new batches for next iteration + if end < len(killmails) { + flatBatch, err = db.ch.PrepareBatch(ctx, "INSERT INTO flat_killmails") + if err != nil { + return fmt.Errorf("failed to prepare flat_killmails batch: %w", err) + } + moduleBatch, err = db.ch.PrepareBatch(ctx, "INSERT INTO fitted_modules") + if err != nil { + return fmt.Errorf("failed to prepare fitted_modules batch: %w", err) + } + } + } + + return nil } func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) { @@ -86,26 +245,30 @@ func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) { modules := deduplicateInt64(params.Modules) flog.Debug("Deduplicated modules: %d -> %d", len(params.Modules), len(modules)) - query := db.db.Table("public.zkill_killmails"). - Select("DISTINCT zkill_killmails.killmail_id, zkill_killmails.solar_system_id"). - Joins("INNER JOIN public.zkill_victims ON zkill_killmails.killmail_id = zkill_victims.killmail_id"). - Where("zkill_victims.ship_type_id = ?", params.Ship) - + ctx := context.Background() + + // Build ClickHouse query + query := "SELECT killmail_id, solar_system_id FROM flat_killmails WHERE victim_ship_type_id = ?" + args := []interface{}{params.Ship} + flog.Debug("Checking total killmails for ship type %d", params.Ship) - var totalCount int64 - countResult := db.db.Table("public.zkill_killmails"). - Joins("INNER JOIN public.zkill_victims ON zkill_killmails.killmail_id = zkill_victims.killmail_id"). - Where("zkill_victims.ship_type_id = ?", params.Ship). - Count(&totalCount) - if countResult.Error != nil { - flog.Error("Failed to count total killmails: %v", countResult.Error) + var totalCount uint64 + countQuery := "SELECT count() FROM flat_killmails WHERE victim_ship_type_id = ?" + if err := db.ch.QueryRow(ctx, countQuery, params.Ship).Scan(&totalCount); err != nil { + flog.Error("Failed to count total killmails: %v", err) } else { flog.Info("Total killmails for ship type %d: %d", params.Ship, totalCount) } - flog.Debug("Base query: ship_type_id = %d", params.Ship) + flog.Debug("Base query: victim_ship_type_id = %d", params.Ship) if len(params.Systems) > 0 { - query = query.Where("zkill_killmails.solar_system_id IN ?", params.Systems) + // Build IN clause with placeholders + placeholders := make([]string, len(params.Systems)) + for i := range params.Systems { + placeholders[i] = "?" + args = append(args, params.Systems[i]) + } + query += " AND solar_system_id IN (" + fmt.Sprintf("%s", placeholders) + ")" flog.Debug("Added system filter: %d systems", len(params.Systems)) } @@ -142,27 +305,37 @@ func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) { continue } - query = query.Where("EXISTS (SELECT 1 FROM public.zkill_items WHERE victim_id = zkill_victims.id AND item_type_id = ? AND flag BETWEEN ? AND ?)", - moduleID, flagMin, flagMax) + query += " AND killmail_id IN (SELECT killmail_id FROM fitted_modules WHERE item_type_id = ? AND flag BETWEEN ? AND ?)" + args = append(args, moduleID, flagMin, flagMax) flog.Debug("Added module filter: module %d in %s slot (flags %d-%d)", moduleID, slot, flagMin, flagMax) } } - var filteredResults []struct { - KillmailID int64 `gorm:"column:killmail_id"` - SolarSystemID int64 `gorm:"column:solar_system_id"` + var killmailIDs []int64 + var systemIDs []int64 + + flog.Debug("Executing filtered query to get killmail IDs") + rows, err := db.ch.Query(ctx, query, args...) + if err != nil { + flog.Error("Failed to execute query: %v", err) + return nil, err } - flog.Debug("Executing filtered query") - result := query.Find(&filteredResults) - if result.Error != nil { - flog.Error("Failed to find killmails: %v", result.Error) - return nil, result.Error + defer rows.Close() + + for rows.Next() { + var killmailID, systemID int64 + if err := rows.Scan(&killmailID, &systemID); err != nil { + flog.Error("Failed to scan row: %v", err) + return nil, err + } + killmailIDs = append(killmailIDs, killmailID) + systemIDs = append(systemIDs, systemID) } - totalKillmails := int64(len(filteredResults)) + totalKillmails := int64(len(killmailIDs)) flog.Info("Found %d killmails after filtering", totalKillmails) if totalKillmails > 0 { - flog.Debug("Sample killmail IDs: %v", filteredResults[:min(5, len(filteredResults))]) + flog.Debug("Sample killmail IDs: %v", killmailIDs[:min(5, len(killmailIDs))]) } stats := &FitStatistics{ @@ -181,19 +354,14 @@ func (db *DBWrapper) QueryFits(params QueryParams) (*FitStatistics, error) { } flog.Debug("Calculating system breakdown") - for _, r := range filteredResults { - stats.SystemBreakdown[r.SolarSystemID]++ + for _, systemID := range systemIDs { + stats.SystemBreakdown[systemID]++ } flog.Debug("System breakdown: %d unique systems", len(stats.SystemBreakdown)) - killmailIDs := make([]int64, len(filteredResults)) - for i, r := range filteredResults { - killmailIDs[i] = r.KillmailID - } flog.Debug("Calculating module statistics for %d killmails", len(killmailIDs)) - err := db.calculateModuleStats(killmailIDs, stats, totalKillmails, flog) - if err != nil { + if err := db.calculateModuleStats(killmailIDs, stats, totalKillmails, flog); err != nil { flog.Error("Failed to calculate module stats: %v", err) return nil, err } @@ -239,9 +407,9 @@ func (db *DBWrapper) getModuleSlots(moduleIDs []int64) (map[int64]string, error) TypeID int32 EffectID int32 } - err := db.db.Table("evesde.dgmTypeEffects"). - Select("\"typeID\", \"effectID\""). - Where("\"typeID\" IN ? AND \"effectID\" IN (11, 12, 13, 2663)", uncached). + err := db.gormDB.Table("dgmTypeEffects"). + Select("typeID, effectID"). + Where("typeID IN ? AND effectID IN (11, 12, 13, 2663)", uncached). Find(&effects).Error if err != nil { return nil, err @@ -265,11 +433,11 @@ func (db *DBWrapper) getModuleSlots(moduleIDs []int64) (map[int64]string, error) droneCategoryID := int32(18) var droneTypeIDs []int32 - err = db.db.Table("evesde.invTypes"). - Select("\"invTypes\".\"typeID\""). - Joins("INNER JOIN evesde.\"invGroups\" ON \"invTypes\".\"groupID\" = \"invGroups\".\"groupID\""). - Where("\"invTypes\".\"typeID\" IN ? AND \"invGroups\".\"categoryID\" = ?", uncached, droneCategoryID). - Pluck("\"invTypes\".\"typeID\"", &droneTypeIDs).Error + err = db.gormDB.Table("invTypes"). + Select("invTypes.typeID"). + Joins("INNER JOIN invGroups ON invTypes.groupID = invGroups.groupID"). + Where("invTypes.typeID IN ? AND invGroups.categoryID = ?", uncached, droneCategoryID). + Pluck("invTypes.typeID", &droneTypeIDs).Error if err != nil { return nil, err } @@ -285,29 +453,54 @@ func (db *DBWrapper) getModuleSlots(moduleIDs []int64) (map[int64]string, error) func (db *DBWrapper) calculateModuleStats(killmailIDs []int64, stats *FitStatistics, total int64, flog *logger.Logger) error { flog.Debug("Querying module stats for %d killmails", len(killmailIDs)) - var items []struct { - ItemTypeID int32 `gorm:"column:item_type_id"` - Flag int64 `gorm:"column:flag"` - Count int64 `gorm:"column:count"` + if len(killmailIDs) == 0 { + return nil } - err := db.db.Table("public.zkill_items"). - Select("zkill_items.item_type_id, zkill_items.flag, COUNT(DISTINCT zkill_victims.killmail_id) as count"). - Joins("INNER JOIN public.zkill_victims ON zkill_items.victim_id = zkill_victims.id"). - Where("zkill_victims.killmail_id IN ?", killmailIDs). - Group("zkill_items.item_type_id, zkill_items.flag"). - Find(&items).Error + ctx := context.Background() + + // Build IN clause with placeholders + placeholders := make([]string, len(killmailIDs)) + args := make([]interface{}, len(killmailIDs)) + for i, id := range killmailIDs { + placeholders[i] = "?" + args[i] = id + } + query := fmt.Sprintf("SELECT item_type_id, flag, count(DISTINCT killmail_id) as count FROM fitted_modules WHERE killmail_id IN (%s) GROUP BY item_type_id, flag", + strings.Join(placeholders, ",")) + + rows, err := db.ch.Query(ctx, query, args...) if err != nil { flog.Error("Failed to query module stats: %v", err) return err } + defer rows.Close() + + var items []struct { + ItemTypeID int32 + Flag int64 + Count uint64 + } + + for rows.Next() { + var item struct { + ItemTypeID int32 + Flag int64 + Count uint64 + } + if err := rows.Scan(&item.ItemTypeID, &item.Flag, &item.Count); err != nil { + flog.Error("Failed to scan module stat: %v", err) + return err + } + items = append(items, item) + } flog.Debug("Found %d item type/flag combinations", len(items)) for _, item := range items { percentage := float64(item.Count) / float64(total) * 100.0 moduleStats := ModuleStats{ - Count: item.Count, + Count: int64(item.Count), Percentage: percentage, } diff --git a/main.go b/main.go index 645e2ca..1a6f221 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,8 @@ func main() { ingest := flag.Bool("ingest", false, "ingest killmails from data directory") flag.Parse() logger.InitFlag() + logger.Info("Starting") + db, err := GetDB() if err != nil { logger.Error("Failed to get database: %v", err) @@ -21,11 +23,6 @@ func main() { } if *ingest { - err = db.DB().AutoMigrate(&Killmail{}, &Attacker{}, &Victim{}, &Item{}, &Position{}) - if err != nil { - logger.Error("Failed to migrate database: %v", err) - return - } DoIngest() }