Rework the types to separate package

This commit is contained in:
2026-01-26 09:24:39 +01:00
parent 045930025c
commit 600fc0d7ff
9 changed files with 269 additions and 975 deletions

View File

@@ -303,14 +303,14 @@ func (db *DBWrapper) QueryShipByVictim(ctx context.Context, filters AnalyticsFil
query := fmt.Sprintf(`
SELECT
victim_ship_type_id,
victim_ship_type_name,
victim_ship_group_name,
victim_ship_category_name,
any(victim_ship_type_name) as victim_ship_type_name,
any(victim_ship_group_name) as victim_ship_group_name,
any(victim_ship_category_name) as victim_ship_category_name,
count() as kill_count,
count(DISTINCT victim_character_id) as unique_pilots_killed
FROM zkill.killmails
%s
GROUP BY victim_ship_type_id, victim_ship_type_name, victim_ship_group_name, victim_ship_category_name
GROUP BY victim_ship_type_id
ORDER BY kill_count DESC
`, whereClause)
@@ -903,4 +903,4 @@ func (db *DBWrapper) QueryKillmailWithItems(ctx context.Context, killmailID int6
result.Items = items
flog.Info("Query returned killmail %d with %d items", killmailID, len(items))
return result, nil
}
}

View File

@@ -1,5 +1,8 @@
services:
backend:
image: docker.site.quack-lab.dev/zkill-susser-backend:latest
volumes:
- sqlite-latest.sqlite:/sqlite-latest.sqlite
image: docker.site.quack-lab.dev/zkill-susser-backend:v1.1.0
environment:
CLICKHOUSE_HOST: "clickhouse-zkill.site.quack-lab.dev"
CLICKHOUSE_DATABASE: "zkill"
CLICKHOUSE_USERNAME: "default"
CLICKHOUSE_PASSWORD: ""

788
enrich.go
View File

@@ -1,788 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
"zkillsusser/models"
logger "git.site.quack-lab.dev/dave/cylogger"
utils "git.site.quack-lab.dev/dave/cyutils"
"github.com/nsqio/go-nsq"
"golang.org/x/sync/errgroup"
)
type FlatKillmail struct {
KillmailID int64 `json:"killmail_id"`
KillmailHash string `json:"killmail_hash"`
KillmailTime string `json:"killmail_time"`
SolarSystemID int32 `json:"solar_system_id"`
SolarSystemName string `json:"solar_system_name"`
ConstellationName string `json:"constellation_name"`
RegionName string `json:"region_name"`
Security float32 `json:"security"`
VictimCharacterID int64 `json:"victim_character_id"`
VictimCharacterName string `json:"victim_character_name"`
VictimCorporationID int64 `json:"victim_corporation_id"`
VictimCorporationName string `json:"victim_corporation_name"`
VictimAllianceID *int64 `json:"victim_alliance_id"`
VictimAllianceName string `json:"victim_alliance_name"`
VictimShipTypeID int32 `json:"victim_ship_type_id"`
VictimShipTypeName string `json:"victim_ship_type_name"`
VictimShipGroupName string `json:"victim_ship_group_name"`
VictimShipCategoryName string `json:"victim_ship_category_name"`
VictimDamageTaken int64 `json:"victim_damage_taken"`
AttackerCount uint16 `json:"attacker_count"`
HTTPLastModified string `json:"http_last_modified"`
}
type FlatKillmailAttacker struct {
KillmailID int64 `json:"killmail_id"`
CharacterID int64 `json:"character_id"`
CharacterName string `json:"character_name"`
CorporationID int64 `json:"corporation_id"`
CorporationName string `json:"corporation_name"`
AllianceID *int64 `json:"alliance_id"`
AllianceName string `json:"alliance_name"`
ShipTypeID int32 `json:"ship_type_id"`
ShipTypeName string `json:"ship_type_name"`
ShipGroupName string `json:"ship_group_name"`
WeaponTypeID int32 `json:"weapon_type_id"`
WeaponTypeName string `json:"weapon_type_name"`
DamageDone int64 `json:"damage_done"`
FinalBlow bool `json:"final_blow"`
SecurityStatus float32 `json:"security_status"`
}
type FlatKillmailItem struct {
KillmailID int64 `json:"killmail_id"`
ItemTypeID int32 `json:"item_type_id"`
ItemTypeName string `json:"item_type_name"`
ItemGroupName string `json:"item_group_name"`
ItemCategoryName string `json:"item_category_name"`
ItemMarketGroupName string `json:"item_market_group_name"`
Flag int32 `json:"flag"`
SlotType string `json:"slot_type"`
QuantityDestroyed int64 `json:"quantity_destroyed"`
QuantityDropped int64 `json:"quantity_dropped"`
Singleton int32 `json:"singleton"`
}
type Cache[T any, K comparable] struct {
m sync.Map
getter func(ctx context.Context, db DB, key K) (T, error)
logger func(key K) *logger.Logger
}
func NewCache[T any, K comparable](getter func(ctx context.Context, db DB, key K) (T, error), logger func(key K) *logger.Logger) *Cache[T, K] {
return &Cache[T, K]{
getter: getter,
logger: logger,
}
}
func (c *Cache[T, K]) Get(ctx context.Context, db DB, key K) (T, error) {
var zero T
val, found := c.m.Load(key)
if found {
return val.(T), nil
}
flog := c.logger(key)
flog.Debug("Querying database")
result, err := c.getter(ctx, db, key)
if err != nil {
flog.Error("Failed to get: %v", err)
return zero, err
}
c.m.Store(key, result)
flog.Debug("Cached")
return result, nil
}
type FlatCache struct {
types *Cache[*models.InvType, int32]
groups *Cache[*models.InvGroup, int32]
categories *Cache[*models.InvCategory, int32]
marketGroups *Cache[*models.InvMarketGroup, int32]
systems *Cache[*models.MapSolarSystem, int32]
constellations *Cache[*models.MapConstellation, int32]
regions *Cache[*models.MapRegion, int32]
}
func getTypeFromDB(ctx context.Context, db DB, typeID int32) (*models.InvType, error) {
return db.GetType(ctx, typeID)
}
func getGroupFromDB(ctx context.Context, db DB, groupID int32) (*models.InvGroup, error) {
return db.GetGroup(ctx, groupID)
}
func getCategoryFromDB(ctx context.Context, db DB, categoryID int32) (*models.InvCategory, error) {
return db.GetCategory(ctx, categoryID)
}
func getMarketGroupFromDB(ctx context.Context, db DB, marketGroupID int32) (*models.InvMarketGroup, error) {
return db.GetMarketGroup(ctx, marketGroupID)
}
func getSolarSystemFromDB(ctx context.Context, db DB, systemID int32) (*models.MapSolarSystem, error) {
return db.GetSolarSystem(ctx, systemID)
}
func getConstellationFromDB(ctx context.Context, db DB, constellationID int32) (*models.MapConstellation, error) {
return db.GetConstellation(ctx, constellationID)
}
func getRegionFromDB(ctx context.Context, db DB, regionID int32) (*models.MapRegion, error) {
return db.GetRegion(ctx, regionID)
}
var globalFlatCache = &FlatCache{
types: NewCache(getTypeFromDB, func(key int32) *logger.Logger {
return logger.Default.WithPrefix("getType").WithPrefix(fmt.Sprintf("type_%d", key))
}),
groups: NewCache(getGroupFromDB, func(key int32) *logger.Logger {
return logger.Default.WithPrefix("getGroup").WithPrefix(fmt.Sprintf("group_%d", key))
}),
categories: NewCache(getCategoryFromDB, func(key int32) *logger.Logger {
return logger.Default.WithPrefix("getCategory").WithPrefix(fmt.Sprintf("category_%d", key))
}),
marketGroups: NewCache(getMarketGroupFromDB, func(key int32) *logger.Logger {
return logger.Default.WithPrefix("getMarketGroup").WithPrefix(fmt.Sprintf("marketgroup_%d", key))
}),
systems: NewCache(getSolarSystemFromDB, func(key int32) *logger.Logger {
return logger.Default.WithPrefix("getSolarSystem").WithPrefix(fmt.Sprintf("system_%d", key))
}),
constellations: NewCache(getConstellationFromDB, func(key int32) *logger.Logger {
return logger.Default.WithPrefix("getConstellation").WithPrefix(fmt.Sprintf("constellation_%d", key))
}),
regions: NewCache(getRegionFromDB, func(key int32) *logger.Logger {
return logger.Default.WithPrefix("getRegion").WithPrefix(fmt.Sprintf("region_%d", key))
}),
}
func FlattenKillmail(db DB, killmail Killmail) (*FlatKillmail, []FlatKillmailAttacker, []FlatKillmailItem, error) {
flog := logger.Default.WithPrefix("FlattenKillmail").WithPrefix(fmt.Sprintf("killmail_%d", killmail.KillmailID))
flat := &FlatKillmail{
KillmailID: killmail.KillmailID,
KillmailHash: killmail.KillmailHash,
KillmailTime: killmail.KillmailTime.Format("2006-01-02 15:04:05"),
HTTPLastModified: killmail.HTTPLastModified.Format("2006-01-02 15:04:05"),
AttackerCount: uint16(len(killmail.Attackers)),
}
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
start := time.Now()
err := flattenSolarSystem(ctx, db, int32(killmail.SolarSystemID), flat)
flog.Debug("flattenSolarSystem took %v", time.Since(start))
return err
})
g.Go(func() error {
start := time.Now()
err := flattenVictim(ctx, db, killmail.Victim, flat)
flog.Debug("flattenVictim took %v", time.Since(start))
return err
})
if err := g.Wait(); err != nil {
flog.Error("Failed to flatten killmail: %v", err)
return nil, nil, nil, err
}
flog.Debug("Flattening %d attackers", len(killmail.Attackers))
attackers := make([]FlatKillmailAttacker, len(killmail.Attackers))
g2, ctx2 := errgroup.WithContext(ctx)
for i, attacker := range killmail.Attackers {
i, attacker := i, attacker // capture loop variables
g2.Go(func() error {
attackerLog := flog.WithPrefix(fmt.Sprintf("attacker_%d", i))
flatAttacker, err := flattenAttacker(ctx2, db, killmail.KillmailID, attacker)
if err != nil {
attackerLog.Error("Failed to flatten attacker: %v", err)
return err
}
attackers[i] = *flatAttacker
return nil
})
}
if err := g2.Wait(); err != nil {
return nil, nil, nil, err
}
flog.Debug("Flattening %d items", len(killmail.Victim.Items))
items := make([]FlatKillmailItem, 0, len(killmail.Victim.Items))
for i, item := range killmail.Victim.Items {
itemLog := flog.WithPrefix(fmt.Sprintf("item_%d", i))
flatItem, err := flattenItemType(ctx, db, killmail.KillmailID, item)
if err != nil {
itemLog.Error("Failed to flatten item: %v", err)
return nil, nil, nil, err
}
items = append(items, *flatItem)
}
return flat, attackers, items, nil
}
func flattenSolarSystem(ctx context.Context, db DB, systemID int32, flat *FlatKillmail) error {
flog := logger.Default.WithPrefix("flattenSolarSystem").WithPrefix(fmt.Sprintf("system_%d", systemID))
flog.Debug("Fetching solar system")
system, err := globalFlatCache.systems.Get(ctx, db, systemID)
if err != nil {
return err
}
flat.SolarSystemID = system.SolarSystemID
flat.SolarSystemName = system.SolarSystemName
flat.Security = system.Security
flog.Debug("Fetching constellation %d", system.ConstellationID)
constellation, err := globalFlatCache.constellations.Get(ctx, db, system.ConstellationID)
if err != nil {
return err
}
flat.ConstellationName = constellation.ConstellationName
flog.Debug("Fetching region %d", constellation.RegionID)
region, err := globalFlatCache.regions.Get(ctx, db, constellation.RegionID)
if err != nil {
return err
}
flat.RegionName = region.RegionName
return nil
}
func flattenVictim(ctx context.Context, db DB, victim Victim, flat *FlatKillmail) error {
flog := logger.Default.WithPrefix("flattenVictim")
flog.Debug("Starting victim flattening")
flat.VictimCharacterID = victim.CharacterID
flat.VictimCorporationID = victim.CorporationID
if victim.AllianceID != 0 {
flat.VictimAllianceID = &victim.AllianceID
}
flat.VictimShipTypeID = int32(victim.ShipTypeID)
flat.VictimDamageTaken = victim.DamageTaken
g, ctx := errgroup.WithContext(ctx)
if victim.CharacterID != 0 {
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching character name for ID %d", victim.CharacterID)
name, err := getCharacterName(victim.CharacterID)
if err != nil {
flog.Debug("Character name fetch failed: %v", err)
}
flat.VictimCharacterName = name
if name != "" {
flog.Debug("Got character name: %s (took %v)", name, time.Since(start))
} else {
flog.Debug("Character name empty (took %v)", time.Since(start))
}
return nil
})
}
if victim.CorporationID != 0 {
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching corporation name for ID %d", victim.CorporationID)
name, err := getCorporationName(victim.CorporationID)
if err != nil {
flog.Debug("Corporation name fetch failed: %v", err)
}
flat.VictimCorporationName = name
if name != "" {
flog.Debug("Got corporation name: %s (took %v)", name, time.Since(start))
} else {
flog.Debug("Corporation name empty (took %v)", time.Since(start))
}
return nil
})
}
if victim.AllianceID != 0 {
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching alliance name for ID %d", victim.AllianceID)
name, err := getAllianceName(victim.AllianceID)
if err != nil {
flog.Debug("Alliance name fetch failed: %v", err)
}
flat.VictimAllianceName = name
if name != "" {
flog.Debug("Got alliance name: %s (took %v)", name, time.Since(start))
} else {
flog.Debug("Alliance name empty (took %v)", time.Since(start))
}
return nil
})
}
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching ship type name for ID %d", victim.ShipTypeID)
typeName, err := flattenTypeName(ctx, db, int32(victim.ShipTypeID))
if err != nil {
return err
}
flat.VictimShipTypeName = typeName
flog.Debug("Got ship type name: %s (took %v)", typeName, time.Since(start))
return nil
})
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching ship group name for type ID %d", victim.ShipTypeID)
groupName, err := flattenGroupName(ctx, db, int32(victim.ShipTypeID))
if err != nil {
return err
}
flat.VictimShipGroupName = groupName
flog.Debug("Got ship group name: %s (took %v)", groupName, time.Since(start))
return nil
})
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching ship category name for type ID %d", victim.ShipTypeID)
categoryName, err := flattenCategoryName(ctx, db, int32(victim.ShipTypeID))
if err != nil {
return err
}
flat.VictimShipCategoryName = categoryName
flog.Debug("Got ship category name: %s (took %v)", categoryName, time.Since(start))
return nil
})
if err := g.Wait(); err != nil {
flog.Error("Failed to flatten victim: %v", err)
return err
}
return nil
}
func flattenAttacker(ctx context.Context, db DB, killmailID int64, attacker Attacker) (*FlatKillmailAttacker, error) {
flog := logger.Default.WithPrefix("flattenAttacker").WithPrefix(fmt.Sprintf("character_%d", attacker.CharacterID))
flog.Debug("Starting attacker flattening")
flat := &FlatKillmailAttacker{
KillmailID: killmailID,
CharacterID: attacker.CharacterID,
CorporationID: attacker.CorporationID,
ShipTypeID: int32(attacker.ShipTypeID),
WeaponTypeID: int32(attacker.WeaponTypeID),
DamageDone: attacker.DamageDone,
FinalBlow: attacker.FinalBlow,
SecurityStatus: float32(attacker.SecurityStatus),
}
if attacker.AllianceID != 0 {
flat.AllianceID = &attacker.AllianceID
}
g, ctx := errgroup.WithContext(ctx)
if attacker.CharacterID != 0 {
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching character name")
name, err := getCharacterName(attacker.CharacterID)
if err != nil {
flog.Debug("Character name fetch failed: %v", err)
}
flat.CharacterName = name
if name != "" {
flog.Debug("Got character name: %s (took %v)", name, time.Since(start))
} else {
flog.Debug("Character name empty (took %v)", time.Since(start))
}
return nil
})
}
if attacker.CorporationID != 0 {
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching corporation name")
name, err := getCorporationName(attacker.CorporationID)
if err != nil {
flog.Debug("Corporation name fetch failed: %v", err)
}
flat.CorporationName = name
if name != "" {
flog.Debug("Got corporation name: %s (took %v)", name, time.Since(start))
} else {
flog.Debug("Corporation name empty (took %v)", time.Since(start))
}
return nil
})
}
if attacker.AllianceID != 0 {
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching alliance name")
name, err := getAllianceName(attacker.AllianceID)
if err != nil {
flog.Debug("Alliance name fetch failed: %v", err)
}
flat.AllianceName = name
if name != "" {
flog.Debug("Got alliance name: %s (took %v)", name, time.Since(start))
} else {
flog.Debug("Alliance name empty (took %v)", time.Since(start))
}
return nil
})
}
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching ship type name for ID %d", attacker.ShipTypeID)
typeName, err := flattenTypeName(ctx, db, int32(attacker.ShipTypeID))
if err != nil {
return err
}
flat.ShipTypeName = typeName
flog.Debug("Got ship type name: %s (took %v)", typeName, time.Since(start))
return nil
})
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching ship group name for type ID %d", attacker.ShipTypeID)
groupName, err := flattenGroupName(ctx, db, int32(attacker.ShipTypeID))
if err != nil {
return err
}
flat.ShipGroupName = groupName
flog.Debug("Got ship group name: %s (took %v)", groupName, time.Since(start))
return nil
})
if attacker.WeaponTypeID != 0 {
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching weapon type name for ID %d", attacker.WeaponTypeID)
typeName, err := flattenTypeName(ctx, db, int32(attacker.WeaponTypeID))
if err != nil {
return err
}
flat.WeaponTypeName = typeName
flog.Debug("Got weapon type name: %s (took %v)", typeName, time.Since(start))
return nil
})
}
if err := g.Wait(); err != nil {
flog.Error("Failed to flatten attacker: %v", err)
return nil, err
}
return flat, nil
}
func flattenItemType(ctx context.Context, db DB, killmailID int64, item Item) (*FlatKillmailItem, error) {
flog := logger.Default.WithPrefix("flattenItemType").WithPrefix(fmt.Sprintf("item_%d", item.ItemTypeID))
flog.Debug("Starting item flattening")
flat := &FlatKillmailItem{
KillmailID: killmailID,
ItemTypeID: int32(item.ItemTypeID),
Flag: int32(item.Flag),
QuantityDestroyed: derefInt64(item.QuantityDestroyed),
QuantityDropped: derefInt64(item.QuantityDropped),
Singleton: int32(item.Singleton),
}
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching item type name")
typeName, err := flattenTypeName(ctx, db, int32(item.ItemTypeID))
if err != nil {
return err
}
flat.ItemTypeName = typeName
flog.Debug("Got item type name: %s (took %v)", typeName, time.Since(start))
return nil
})
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching item group name")
groupName, err := flattenGroupName(ctx, db, int32(item.ItemTypeID))
if err != nil {
return err
}
flat.ItemGroupName = groupName
flog.Debug("Got item group name: %s (took %v)", groupName, time.Since(start))
return nil
})
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching item category name")
categoryName, err := flattenCategoryName(ctx, db, int32(item.ItemTypeID))
if err != nil {
return err
}
flat.ItemCategoryName = categoryName
flog.Debug("Got item category name: %s (took %v)", categoryName, time.Since(start))
return nil
})
g.Go(func() error {
start := time.Now()
flog.Debug("Fetching item market group name")
marketGroupName, err := flattenMarketGroupName(ctx, db, int32(item.ItemTypeID))
if err != nil {
return err
}
flat.ItemMarketGroupName = marketGroupName
flog.Debug("Got item market group name: %s (took %v)", marketGroupName, time.Since(start))
return nil
})
if err := g.Wait(); err != nil {
flog.Error("Failed to flatten item: %v", err)
return nil, err
}
return flat, nil
}
func flattenTypeName(ctx context.Context, db DB, typeID int32) (string, error) {
flog := logger.Default.WithPrefix("flattenTypeName").WithPrefix(fmt.Sprintf("type_%d", typeID))
flog.Debug("Fetching type name")
itemType, err := globalFlatCache.types.Get(ctx, db, typeID)
if err != nil {
return "", err
}
flog.Debug("Got type name: %s", itemType.TypeName)
return itemType.TypeName, nil
}
func flattenGroupName(ctx context.Context, db DB, typeID int32) (string, error) {
flog := logger.Default.WithPrefix("flattenGroupName").WithPrefix(fmt.Sprintf("type_%d", typeID))
flog.Debug("Fetching group name")
itemType, err := globalFlatCache.types.Get(ctx, db, typeID)
if err != nil {
return "", err
}
group, err := globalFlatCache.groups.Get(ctx, db, itemType.GroupID)
if err != nil {
return "", err
}
flog.Debug("Got group name: %s", group.GroupName)
return group.GroupName, nil
}
func flattenCategoryName(ctx context.Context, db DB, typeID int32) (string, error) {
flog := logger.Default.WithPrefix("flattenCategoryName").WithPrefix(fmt.Sprintf("type_%d", typeID))
flog.Debug("Fetching category name")
itemType, err := globalFlatCache.types.Get(ctx, db, typeID)
if err != nil {
return "", err
}
group, err := globalFlatCache.groups.Get(ctx, db, itemType.GroupID)
if err != nil {
return "", err
}
category, err := globalFlatCache.categories.Get(ctx, db, group.CategoryID)
if err != nil {
return "", err
}
flog.Debug("Got category name: %s", category.CategoryName)
return category.CategoryName, nil
}
func flattenMarketGroupName(ctx context.Context, db DB, typeID int32) (string, error) {
flog := logger.Default.WithPrefix("flattenMarketGroupName").WithPrefix(fmt.Sprintf("type_%d", typeID))
flog.Debug("Fetching market group name")
itemType, err := globalFlatCache.types.Get(ctx, db, typeID)
if err != nil {
return "", err
}
if itemType.MarketGroupID == 0 {
flog.Debug("Type has no market group")
return "", nil
}
marketGroup, err := globalFlatCache.marketGroups.Get(ctx, db, itemType.MarketGroupID)
if err != nil {
return "", err
}
flog.Debug("Got market group name: %s", marketGroup.MarketGroupName)
return marketGroup.MarketGroupName, nil
}
type namedEntity interface {
GetName() string
}
func (c Character) GetName() string { return c.Name }
func (c Corporation) GetName() string { return c.Name }
func (a Alliance) GetName() string { return a.Name }
var (
flattenerConsumer *nsq.Consumer
flattenerMaxInFlight int
flattenerPaused bool
flattenerPauseMu sync.Mutex
// Memoized ESI name lookup functions
getCharacterNameMemo func(int64) (string, error)
getCorporationNameMemo func(int64) (string, error)
getAllianceNameMemo func(int64) (string, error)
)
func init() {
// Use MemoizedBloom to only cache names that appear multiple times
// Capacity: 100k unique IDs, false positive rate: 1%
getCharacterNameMemo = utils.MemoizedBloom(getCharacterNameImpl, 100000, 0.01).(func(int64) (string, error))
getCorporationNameMemo = utils.MemoizedBloom(getCorporationNameImpl, 100000, 0.01).(func(int64) (string, error))
getAllianceNameMemo = utils.MemoizedBloom(getAllianceNameImpl, 50000, 0.01).(func(int64) (string, error))
}
func pauseConsumer() {
flattenerPauseMu.Lock()
defer flattenerPauseMu.Unlock()
// If already paused, do nothing
if flattenerPaused {
return
}
// Pause the consumer
if flattenerConsumer != nil {
flattenerConsumer.ChangeMaxInFlight(0)
flattenerPaused = true
logger.Default.Info("Paused NSQ consumer due to rate limit for 15 minutes")
// Resume after 15 minutes
go func() {
time.Sleep(15 * time.Minute)
flattenerPauseMu.Lock()
defer flattenerPauseMu.Unlock()
if flattenerConsumer != nil {
flattenerConsumer.ChangeMaxInFlight(flattenerMaxInFlight)
flattenerPaused = false
logger.Default.Info("Resumed NSQ consumer after rate limit pause")
}
}()
}
}
func getName[T namedEntity](entityType, cachePrefix string, entityID int64) (string, error) {
flog := logger.Default.WithPrefix(fmt.Sprintf("get%sName", entityType)).WithPrefix(fmt.Sprintf("%s_%d", cachePrefix, entityID))
esiURL := fmt.Sprintf("https://esi.evetech.net/%s/%d", cachePrefix, entityID)
proxyURL := fmt.Sprintf("https://proxy.site.quack-lab.dev?url=%s", esiURL)
flog.Debug("Fetching %s name from ESI", entityType)
flog.Debug("ESI URL: %s", esiURL)
flog.Debug("Proxy URL: %s", proxyURL)
resp, err := http.Get(proxyURL)
if err != nil {
flog.Debug("%s request failed: %v", entityType, err)
return "", err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
flog.Debug("Failed to read response body: %v", err)
return "", err
}
var entity T
if err := json.Unmarshal(body, &entity); err != nil {
flog.Debug("Failed to unmarshal response: %v", err)
return "", err
}
flog.Debug("Successfully got %s name: %s", entityType, entity.GetName())
return entity.GetName(), nil
}
if resp.StatusCode == http.StatusNotFound {
io.Copy(io.Discard, resp.Body)
flog.Debug("%s not found (404)", entityType)
return "", nil
}
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == 429 || resp.StatusCode >= 500 {
io.Copy(io.Discard, resp.Body)
flog.Debug("Rate limited or server error (status %d), pausing consumer", resp.StatusCode)
pauseConsumer()
return "", nil
}
io.Copy(io.Discard, resp.Body)
flog.Debug("%s request failed with status %d", entityType, resp.StatusCode)
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
func getCharacterName(characterID int64) (string, error) {
return getCharacterNameMemo(characterID)
}
func getCorporationName(corporationID int64) (string, error) {
return getCorporationNameMemo(corporationID)
}
func getAllianceName(allianceID int64) (string, error) {
return getAllianceNameMemo(allianceID)
}
func getCharacterNameImpl(characterID int64) (string, error) {
return getName[Character]("Character", "character", characterID)
}
func getCorporationNameImpl(corporationID int64) (string, error) {
return getName[Corporation]("Corporation", "corporation", corporationID)
}
func getAllianceNameImpl(allianceID int64) (string, error) {
return getName[Alliance]("Alliance", "alliance", allianceID)
}

View File

@@ -16,12 +16,6 @@ var flattenerProcessedCount int64
func runFlattenerStage() {
logger.Info("Starting flattener stage")
db, err := GetDB()
if err != nil {
logger.Error("Failed to get database: %v", err)
return
}
config := nsq.NewConfig()
config.MaxAttempts = 0
config.MaxInFlight = stage2Workers
@@ -33,10 +27,6 @@ func runFlattenerStage() {
return
}
// Store consumer reference and MaxInFlight for rate limit pausing
flattenerConsumer = consumer
flattenerMaxInFlight = stage2Workers
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig())
if err != nil {
logger.Error("Error creating producer: %v", err)
@@ -44,7 +34,6 @@ func runFlattenerStage() {
}
defer producer.Stop()
// Ping to establish connection
if err := producer.Ping(); err != nil {
logger.Error("Failed to ping NSQ producer: %v", err)
return
@@ -52,7 +41,6 @@ func runFlattenerStage() {
for i := 0; i < stage2Workers; i++ {
handler := &FlattenerHandler{
db: db,
producer: producer,
workerID: i,
}
@@ -71,7 +59,6 @@ func runFlattenerStage() {
}
type FlattenerHandler struct {
db DB
producer *nsq.Producer
workerID int
}
@@ -105,11 +92,7 @@ func (h *FlattenerHandler) HandleMessage(message *nsq.Message) error {
messagelog = messagelog.WithPrefix(fmt.Sprintf("killmail_%d", killmail.KillmailID))
flatKillmail, flatAttackers, flatItems, err := FlattenKillmail(h.db, killmail)
if err != nil {
messagelog.Error("Failed to flatten killmail: %v", err)
return err
}
flatKillmail, flatAttackers, flatItems := FlattenKillmailSimple(killmail)
flatMsg := FlatKillmailMessage{
Killmail: flatKillmail,

40
main.go
View File

@@ -30,18 +30,48 @@ func main() {
return
}
logger.Error("No action specified. Set SERVER=true or STAGE=file-reader|flattener|inserter")
logger.Error("No action specified. Set SERVER=true or STAGE=1|2|3|4.1|4.2|4.3|5.1|5.2|5.3|6.1|6.2|6.3|7.1|7.2|7.3|8.1|8.2|8.3")
}
func runStage(stage string) {
switch stage {
case "file-reader":
case "1":
runFileReaderStage()
case "flattener":
case "2":
runFlattenerStage()
case "inserter":
case "3":
runInserterStage()
case "4.1":
runEnrichmentReaderStage("type")
case "4.2":
runEnrichmentEnricherStage("type")
case "4.3":
runEnrichmentUpdaterStage("type")
case "5.1":
runEnrichmentReaderStage("system")
case "5.2":
runEnrichmentEnricherStage("system")
case "5.3":
runEnrichmentUpdaterStage("system")
case "6.1":
runEnrichmentReaderStage("character")
case "6.2":
runEnrichmentEnricherStage("character")
case "6.3":
runEnrichmentUpdaterStage("character")
case "7.1":
runEnrichmentReaderStage("corporation")
case "7.2":
runEnrichmentEnricherStage("corporation")
case "7.3":
runEnrichmentUpdaterStage("corporation")
case "8.1":
runEnrichmentReaderStage("alliance")
case "8.2":
runEnrichmentEnricherStage("alliance")
case "8.3":
runEnrichmentUpdaterStage("alliance")
default:
logger.Error("Unknown stage: %s. Use: file-reader, flattener, inserter", stage)
logger.Error("Unknown stage: %s", stage)
}
}

156
types.go
View File

@@ -1,156 +0,0 @@
package main
import "time"
type Killmail struct {
Attackers []Attacker `json:"attackers"`
KillmailID int64 `json:"killmail_id"`
KillmailTime time.Time `json:"killmail_time"`
SolarSystemID int64 `json:"solar_system_id"`
Victim Victim `json:"victim"`
KillmailHash string `json:"killmail_hash"`
HTTPLastModified time.Time `json:"http_last_modified"`
}
type Attacker struct {
AllianceID int64 `json:"alliance_id"`
CharacterID int64 `json:"character_id"`
CorporationID int64 `json:"corporation_id"`
DamageDone int64 `json:"damage_done"`
FinalBlow bool `json:"final_blow"`
SecurityStatus float64 `json:"security_status"`
ShipTypeID int64 `json:"ship_type_id"`
WeaponTypeID int64 `json:"weapon_type_id"`
}
type Victim struct {
AllianceID int64 `json:"alliance_id"`
CharacterID int64 `json:"character_id"`
CorporationID int64 `json:"corporation_id"`
DamageTaken int64 `json:"damage_taken"`
Items []Item `json:"items"`
Position Position `json:"position"`
ShipTypeID int64 `json:"ship_type_id"`
}
type Item struct {
Flag int64 `json:"flag"`
ItemTypeID int64 `json:"item_type_id"`
QuantityDestroyed *int64 `json:"quantity_destroyed,omitempty"`
Singleton int64 `json:"singleton"`
QuantityDropped *int64 `json:"quantity_dropped,omitempty"`
}
type Position struct {
X float64 `json:"x"`
Y float64 `json:"y"`
Z float64 `json:"z"`
}
// Helper functions
func boolToUint8(b bool) uint8 {
if b {
return 1
}
return 0
}
func derefInt64(ptr *int64) int64 {
if ptr == nil {
return 0
}
return *ptr
}
type ModuleSlot string
var (
ModuleSlotLow ModuleSlot = "Low"
ModuleSlotMid ModuleSlot = "Mid"
ModuleSlotHigh ModuleSlot = "High"
ModuleSlotRig ModuleSlot = "Rig"
ModuleSlotSubsystem ModuleSlot = "Subsystem"
ModuleSlotDrone ModuleSlot = "Drone"
ModuleSlotOther ModuleSlot = "Other"
)
// region Other various types
type QueryParams struct {
Ship int64
Systems []int64
Modules []int64
Groups []int64
KillmailLimit int
}
type ModuleStatsData struct {
KillmailIDs []int64
}
type ComprehensiveStatsData struct {
QueryParams
KillmailLimit int
}
// CacheEntry stores both statistics (JSON) and images (blobs) in unified cache
// For 404s, we store a special marker: []byte{0xFF, 0xFE, 0xFD} (NOT_FOUND_MARKER)
type CacheEntry struct {
Key string `gorm:"primaryKey"`
Data []byte `gorm:"type:BLOB;not null"`
CreatedAt time.Time `gorm:"not null;index"`
}
var notFoundMarker = []byte{0xFF, 0xFE, 0xFD} // Special marker for cached 404s
func (CacheEntry) TableName() string {
return "cache_entries"
}
type FitStatistics struct {
TotalKillmails int64
ShipBreakdown map[int64]int64
SystemBreakdown map[int64]int64
HighSlotModules map[int32]int64
MidSlotModules map[int32]int64
LowSlotModules map[int32]int64
Rigs map[int32]int64
Drones map[int32]int64
KillmailIDs []int64
}
type Character struct {
AllianceID int64 `json:"alliance_id"`
Birthday time.Time `json:"birthday"`
BloodlineID int64 `json:"bloodline_id"`
CorporationID int64 `json:"corporation_id"`
Description string `json:"description"`
Gender string `json:"gender"`
Name string `json:"name"`
RaceID int64 `json:"race_id"`
SecurityStatus float64 `json:"security_status"`
}
type Corporation struct {
AllianceID int64 `json:"alliance_id"`
CeoID int64 `json:"ceo_id"`
CreatorID int64 `json:"creator_id"`
DateFounded time.Time `json:"date_founded"`
Description string `json:"description"`
HomeStationID int64 `json:"home_station_id"`
MemberCount int64 `json:"member_count"`
Name string `json:"name"`
Shares int64 `json:"shares"`
TaxRate float64 `json:"tax_rate"`
Ticker string `json:"ticker"`
URL string `json:"url"`
WarEligible bool `json:"war_eligible"`
}
type Alliance struct {
CreatorCorporationID int64 `json:"creator_corporation_id"`
CreatorID int64 `json:"creator_id"`
DateFounded time.Time `json:"date_founded"`
ExecutorCorporationID int64 `json:"executor_corporation_id"`
Name string `json:"name"`
Ticker string `json:"ticker"`
}

60
types/esi_killmail.go Normal file
View File

@@ -0,0 +1,60 @@
package types
import "time"
type Killmail struct {
Attackers []Attacker `json:"attackers"`
KillmailID int64 `json:"killmail_id"`
KillmailTime time.Time `json:"killmail_time"`
SolarSystemID int64 `json:"solar_system_id"`
Victim Victim `json:"victim"`
KillmailHash string `json:"killmail_hash"`
HTTPLastModified time.Time `json:"http_last_modified"`
}
type Attacker struct {
AllianceID int64 `json:"alliance_id"`
CharacterID int64 `json:"character_id"`
CorporationID int64 `json:"corporation_id"`
DamageDone int64 `json:"damage_done"`
FinalBlow bool `json:"final_blow"`
SecurityStatus float64 `json:"security_status"`
ShipTypeID int64 `json:"ship_type_id"`
WeaponTypeID int64 `json:"weapon_type_id"`
}
type Victim struct {
AllianceID int64 `json:"alliance_id"`
CharacterID int64 `json:"character_id"`
CorporationID int64 `json:"corporation_id"`
DamageTaken int64 `json:"damage_taken"`
Items []Item `json:"items"`
Position Position `json:"position"`
ShipTypeID int64 `json:"ship_type_id"`
}
type Item struct {
Flag int64 `json:"flag"`
ItemTypeID int64 `json:"item_type_id"`
QuantityDestroyed *int64 `json:"quantity_destroyed,omitempty"`
Singleton int64 `json:"singleton"`
QuantityDropped *int64 `json:"quantity_dropped,omitempty"`
}
type Position struct {
X float64 `json:"x"`
Y float64 `json:"y"`
Z float64 `json:"z"`
}
type ModuleSlot string
var (
ModuleSlotLow ModuleSlot = "Low"
ModuleSlotMid ModuleSlot = "Mid"
ModuleSlotHigh ModuleSlot = "High"
ModuleSlotRig ModuleSlot = "Rig"
ModuleSlotSubsystem ModuleSlot = "Subsystem"
ModuleSlotDrone ModuleSlot = "Drone"
ModuleSlotOther ModuleSlot = "Other"
)

40
types/esi_types.go Normal file
View File

@@ -0,0 +1,40 @@
package types
import "time"
type Character struct {
AllianceID int64 `json:"alliance_id"`
Birthday time.Time `json:"birthday"`
BloodlineID int64 `json:"bloodline_id"`
CorporationID int64 `json:"corporation_id"`
Description string `json:"description"`
Gender string `json:"gender"`
Name string `json:"name"`
RaceID int64 `json:"race_id"`
SecurityStatus float64 `json:"security_status"`
}
type Corporation struct {
AllianceID int64 `json:"alliance_id"`
CeoID int64 `json:"ceo_id"`
CreatorID int64 `json:"creator_id"`
DateFounded time.Time `json:"date_founded"`
Description string `json:"description"`
HomeStationID int64 `json:"home_station_id"`
MemberCount int64 `json:"member_count"`
Name string `json:"name"`
Shares int64 `json:"shares"`
TaxRate float64 `json:"tax_rate"`
Ticker string `json:"ticker"`
URL string `json:"url"`
WarEligible bool `json:"war_eligible"`
}
type Alliance struct {
CreatorCorporationID int64 `json:"creator_corporation_id"`
CreatorID int64 `json:"creator_id"`
DateFounded time.Time `json:"date_founded"`
ExecutorCorporationID int64 `json:"executor_corporation_id"`
Name string `json:"name"`
Ticker string `json:"ticker"`
}

122
types/flat_killmail.go Normal file
View File

@@ -0,0 +1,122 @@
package types
type FlatKillmail struct {
KillmailID int64 `json:"killmail_id"`
KillmailHash string `json:"killmail_hash"`
KillmailTime string `json:"killmail_time"`
SolarSystemID int32 `json:"solar_system_id"`
SolarSystemName string `json:"solar_system_name"`
ConstellationName string `json:"constellation_name"`
RegionName string `json:"region_name"`
Security float32 `json:"security"`
VictimCharacterID int64 `json:"victim_character_id"`
VictimCharacterName string `json:"victim_character_name"`
VictimCorporationID int64 `json:"victim_corporation_id"`
VictimCorporationName string `json:"victim_corporation_name"`
VictimAllianceID *int64 `json:"victim_alliance_id"`
VictimAllianceName string `json:"victim_alliance_name"`
VictimShipTypeID int32 `json:"victim_ship_type_id"`
VictimShipTypeName string `json:"victim_ship_type_name"`
VictimShipGroupName string `json:"victim_ship_group_name"`
VictimShipCategoryName string `json:"victim_ship_category_name"`
VictimDamageTaken int64 `json:"victim_damage_taken"`
AttackerCount uint16 `json:"attacker_count"`
HTTPLastModified string `json:"http_last_modified"`
}
type FlatKillmailAttacker struct {
KillmailID int64 `json:"killmail_id"`
CharacterID int64 `json:"character_id"`
CharacterName string `json:"character_name"`
CorporationID int64 `json:"corporation_id"`
CorporationName string `json:"corporation_name"`
AllianceID *int64 `json:"alliance_id"`
AllianceName string `json:"alliance_name"`
ShipTypeID int32 `json:"ship_type_id"`
ShipTypeName string `json:"ship_type_name"`
ShipGroupName string `json:"ship_group_name"`
WeaponTypeID int32 `json:"weapon_type_id"`
WeaponTypeName string `json:"weapon_type_name"`
DamageDone int64 `json:"damage_done"`
FinalBlow bool `json:"final_blow"`
SecurityStatus float32 `json:"security_status"`
}
type FlatKillmailItem struct {
KillmailID int64 `json:"killmail_id"`
ItemTypeID int32 `json:"item_type_id"`
ItemTypeName string `json:"item_type_name"`
ItemGroupName string `json:"item_group_name"`
ItemCategoryName string `json:"item_category_name"`
ItemMarketGroupName string `json:"item_market_group_name"`
Flag int32 `json:"flag"`
SlotType string `json:"slot_type"`
QuantityDestroyed int64 `json:"quantity_destroyed"`
QuantityDropped int64 `json:"quantity_dropped"`
Singleton int32 `json:"singleton"`
}
func (k *Killmail) Flatten() (*FlatKillmail, []FlatKillmailAttacker, []FlatKillmailItem) {
flat := &FlatKillmail{
KillmailID: k.KillmailID,
KillmailHash: k.KillmailHash,
KillmailTime: k.KillmailTime.Format("2006-01-02 15:04:05"),
HTTPLastModified: k.HTTPLastModified.Format("2006-01-02 15:04:05"),
AttackerCount: uint16(len(k.Attackers)),
SolarSystemID: int32(k.SolarSystemID),
VictimCharacterID: k.Victim.CharacterID,
VictimCorporationID: k.Victim.CorporationID,
VictimShipTypeID: int32(k.Victim.ShipTypeID),
VictimDamageTaken: k.Victim.DamageTaken,
}
if k.Victim.AllianceID != 0 {
flat.VictimAllianceID = &k.Victim.AllianceID
}
attackers := make([]FlatKillmailAttacker, len(k.Attackers))
for i, attacker := range k.Attackers {
flatAttacker := &FlatKillmailAttacker{
KillmailID: k.KillmailID,
CharacterID: attacker.CharacterID,
CorporationID: attacker.CorporationID,
ShipTypeID: int32(attacker.ShipTypeID),
WeaponTypeID: int32(attacker.WeaponTypeID),
DamageDone: attacker.DamageDone,
FinalBlow: attacker.FinalBlow,
SecurityStatus: float32(attacker.SecurityStatus),
}
if attacker.AllianceID != 0 {
flatAttacker.AllianceID = &attacker.AllianceID
}
attackers[i] = *flatAttacker
}
items := make([]FlatKillmailItem, 0, len(k.Victim.Items))
for _, item := range k.Victim.Items {
flatItem := FlatKillmailItem{
KillmailID: k.KillmailID,
ItemTypeID: int32(item.ItemTypeID),
Flag: int32(item.Flag),
QuantityDestroyed: derefInt64(item.QuantityDestroyed),
QuantityDropped: derefInt64(item.QuantityDropped),
Singleton: int32(item.Singleton),
}
items = append(items, flatItem)
}
return flat, attackers, items
}
func derefInt64(i *int64) int64 {
if i == nil {
return 0
}
return *i
}