package main import ( "context" "encoding/json" "fmt" "io" "net/http" "sync" "time" "zkillsusser/models" logger "git.site.quack-lab.dev/dave/cylogger" utils "git.site.quack-lab.dev/dave/cyutils" "github.com/nsqio/go-nsq" "golang.org/x/sync/errgroup" ) type FlatKillmail struct { KillmailID int64 `json:"killmail_id"` KillmailHash string `json:"killmail_hash"` KillmailTime string `json:"killmail_time"` SolarSystemID int32 `json:"solar_system_id"` SolarSystemName string `json:"solar_system_name"` ConstellationName string `json:"constellation_name"` RegionName string `json:"region_name"` Security float32 `json:"security"` VictimCharacterID int64 `json:"victim_character_id"` VictimCharacterName string `json:"victim_character_name"` VictimCorporationID int64 `json:"victim_corporation_id"` VictimCorporationName string `json:"victim_corporation_name"` VictimAllianceID *int64 `json:"victim_alliance_id"` VictimAllianceName string `json:"victim_alliance_name"` VictimShipTypeID int32 `json:"victim_ship_type_id"` VictimShipTypeName string `json:"victim_ship_type_name"` VictimShipGroupName string `json:"victim_ship_group_name"` VictimShipCategoryName string `json:"victim_ship_category_name"` VictimDamageTaken int64 `json:"victim_damage_taken"` AttackerCount uint16 `json:"attacker_count"` HTTPLastModified string `json:"http_last_modified"` } type FlatKillmailAttacker struct { KillmailID int64 `json:"killmail_id"` CharacterID int64 `json:"character_id"` CharacterName string `json:"character_name"` CorporationID int64 `json:"corporation_id"` CorporationName string `json:"corporation_name"` AllianceID *int64 `json:"alliance_id"` AllianceName string `json:"alliance_name"` ShipTypeID int32 `json:"ship_type_id"` ShipTypeName string `json:"ship_type_name"` ShipGroupName string `json:"ship_group_name"` WeaponTypeID int32 `json:"weapon_type_id"` WeaponTypeName string `json:"weapon_type_name"` DamageDone int64 `json:"damage_done"` FinalBlow bool `json:"final_blow"` SecurityStatus float32 `json:"security_status"` } type FlatKillmailItem struct { KillmailID int64 `json:"killmail_id"` ItemTypeID int32 `json:"item_type_id"` ItemTypeName string `json:"item_type_name"` ItemGroupName string `json:"item_group_name"` ItemCategoryName string `json:"item_category_name"` ItemMarketGroupName string `json:"item_market_group_name"` Flag int32 `json:"flag"` SlotType string `json:"slot_type"` QuantityDestroyed int64 `json:"quantity_destroyed"` QuantityDropped int64 `json:"quantity_dropped"` Singleton int32 `json:"singleton"` } type Cache[T any, K comparable] struct { m sync.Map getter func(ctx context.Context, db DB, key K) (T, error) logger func(key K) *logger.Logger } func NewCache[T any, K comparable](getter func(ctx context.Context, db DB, key K) (T, error), logger func(key K) *logger.Logger) *Cache[T, K] { return &Cache[T, K]{ getter: getter, logger: logger, } } func (c *Cache[T, K]) Get(ctx context.Context, db DB, key K) (T, error) { var zero T val, found := c.m.Load(key) if found { return val.(T), nil } flog := c.logger(key) flog.Debug("Querying database") result, err := c.getter(ctx, db, key) if err != nil { flog.Error("Failed to get: %v", err) return zero, err } c.m.Store(key, result) flog.Debug("Cached") return result, nil } type FlatCache struct { types *Cache[*models.InvType, int32] groups *Cache[*models.InvGroup, int32] categories *Cache[*models.InvCategory, int32] marketGroups *Cache[*models.InvMarketGroup, int32] systems *Cache[*models.MapSolarSystem, int32] constellations *Cache[*models.MapConstellation, int32] regions *Cache[*models.MapRegion, int32] } func getTypeFromDB(ctx context.Context, db DB, typeID int32) (*models.InvType, error) { return db.GetType(ctx, typeID) } func getGroupFromDB(ctx context.Context, db DB, groupID int32) (*models.InvGroup, error) { return db.GetGroup(ctx, groupID) } func getCategoryFromDB(ctx context.Context, db DB, categoryID int32) (*models.InvCategory, error) { return db.GetCategory(ctx, categoryID) } func getMarketGroupFromDB(ctx context.Context, db DB, marketGroupID int32) (*models.InvMarketGroup, error) { return db.GetMarketGroup(ctx, marketGroupID) } func getSolarSystemFromDB(ctx context.Context, db DB, systemID int32) (*models.MapSolarSystem, error) { return db.GetSolarSystem(ctx, systemID) } func getConstellationFromDB(ctx context.Context, db DB, constellationID int32) (*models.MapConstellation, error) { return db.GetConstellation(ctx, constellationID) } func getRegionFromDB(ctx context.Context, db DB, regionID int32) (*models.MapRegion, error) { return db.GetRegion(ctx, regionID) } var globalFlatCache = &FlatCache{ types: NewCache(getTypeFromDB, func(key int32) *logger.Logger { return logger.Default.WithPrefix("getType").WithPrefix(fmt.Sprintf("type_%d", key)) }), groups: NewCache(getGroupFromDB, func(key int32) *logger.Logger { return logger.Default.WithPrefix("getGroup").WithPrefix(fmt.Sprintf("group_%d", key)) }), categories: NewCache(getCategoryFromDB, func(key int32) *logger.Logger { return logger.Default.WithPrefix("getCategory").WithPrefix(fmt.Sprintf("category_%d", key)) }), marketGroups: NewCache(getMarketGroupFromDB, func(key int32) *logger.Logger { return logger.Default.WithPrefix("getMarketGroup").WithPrefix(fmt.Sprintf("marketgroup_%d", key)) }), systems: NewCache(getSolarSystemFromDB, func(key int32) *logger.Logger { return logger.Default.WithPrefix("getSolarSystem").WithPrefix(fmt.Sprintf("system_%d", key)) }), constellations: NewCache(getConstellationFromDB, func(key int32) *logger.Logger { return logger.Default.WithPrefix("getConstellation").WithPrefix(fmt.Sprintf("constellation_%d", key)) }), regions: NewCache(getRegionFromDB, func(key int32) *logger.Logger { return logger.Default.WithPrefix("getRegion").WithPrefix(fmt.Sprintf("region_%d", key)) }), } func FlattenKillmail(db DB, killmail Killmail) (*FlatKillmail, []FlatKillmailAttacker, []FlatKillmailItem, error) { flog := logger.Default.WithPrefix("FlattenKillmail").WithPrefix(fmt.Sprintf("killmail_%d", killmail.KillmailID)) flat := &FlatKillmail{ KillmailID: killmail.KillmailID, KillmailHash: killmail.KillmailHash, KillmailTime: killmail.KillmailTime.Format("2006-01-02 15:04:05"), HTTPLastModified: killmail.HTTPLastModified.Format("2006-01-02 15:04:05"), AttackerCount: uint16(len(killmail.Attackers)), } g, ctx := errgroup.WithContext(context.Background()) g.Go(func() error { start := time.Now() err := flattenSolarSystem(ctx, db, int32(killmail.SolarSystemID), flat) flog.Debug("flattenSolarSystem took %v", time.Since(start)) return err }) g.Go(func() error { start := time.Now() err := flattenVictim(ctx, db, killmail.Victim, flat) flog.Debug("flattenVictim took %v", time.Since(start)) return err }) if err := g.Wait(); err != nil { flog.Error("Failed to flatten killmail: %v", err) return nil, nil, nil, err } flog.Debug("Flattening %d attackers", len(killmail.Attackers)) attackers := make([]FlatKillmailAttacker, len(killmail.Attackers)) g2, ctx2 := errgroup.WithContext(ctx) for i, attacker := range killmail.Attackers { i, attacker := i, attacker // capture loop variables g2.Go(func() error { attackerLog := flog.WithPrefix(fmt.Sprintf("attacker_%d", i)) flatAttacker, err := flattenAttacker(ctx2, db, killmail.KillmailID, attacker) if err != nil { attackerLog.Error("Failed to flatten attacker: %v", err) return err } attackers[i] = *flatAttacker return nil }) } if err := g2.Wait(); err != nil { return nil, nil, nil, err } flog.Debug("Flattening %d items", len(killmail.Victim.Items)) items := make([]FlatKillmailItem, 0, len(killmail.Victim.Items)) for i, item := range killmail.Victim.Items { itemLog := flog.WithPrefix(fmt.Sprintf("item_%d", i)) flatItem, err := flattenItemType(ctx, db, killmail.KillmailID, item) if err != nil { itemLog.Error("Failed to flatten item: %v", err) return nil, nil, nil, err } items = append(items, *flatItem) } return flat, attackers, items, nil } func flattenSolarSystem(ctx context.Context, db DB, systemID int32, flat *FlatKillmail) error { flog := logger.Default.WithPrefix("flattenSolarSystem").WithPrefix(fmt.Sprintf("system_%d", systemID)) flog.Debug("Fetching solar system") system, err := globalFlatCache.systems.Get(ctx, db, systemID) if err != nil { return err } flat.SolarSystemID = system.SolarSystemID flat.SolarSystemName = system.SolarSystemName flat.Security = system.Security flog.Debug("Fetching constellation %d", system.ConstellationID) constellation, err := globalFlatCache.constellations.Get(ctx, db, system.ConstellationID) if err != nil { return err } flat.ConstellationName = constellation.ConstellationName flog.Debug("Fetching region %d", constellation.RegionID) region, err := globalFlatCache.regions.Get(ctx, db, constellation.RegionID) if err != nil { return err } flat.RegionName = region.RegionName return nil } func flattenVictim(ctx context.Context, db DB, victim Victim, flat *FlatKillmail) error { flog := logger.Default.WithPrefix("flattenVictim") flog.Debug("Starting victim flattening") flat.VictimCharacterID = victim.CharacterID flat.VictimCorporationID = victim.CorporationID if victim.AllianceID != 0 { flat.VictimAllianceID = &victim.AllianceID } flat.VictimShipTypeID = int32(victim.ShipTypeID) flat.VictimDamageTaken = victim.DamageTaken g, ctx := errgroup.WithContext(ctx) if victim.CharacterID != 0 { g.Go(func() error { start := time.Now() flog.Debug("Fetching character name for ID %d", victim.CharacterID) name, err := getCharacterName(victim.CharacterID) if err != nil { flog.Debug("Character name fetch failed: %v", err) } flat.VictimCharacterName = name if name != "" { flog.Debug("Got character name: %s (took %v)", name, time.Since(start)) } else { flog.Debug("Character name empty (took %v)", time.Since(start)) } return nil }) } if victim.CorporationID != 0 { g.Go(func() error { start := time.Now() flog.Debug("Fetching corporation name for ID %d", victim.CorporationID) name, err := getCorporationName(victim.CorporationID) if err != nil { flog.Debug("Corporation name fetch failed: %v", err) } flat.VictimCorporationName = name if name != "" { flog.Debug("Got corporation name: %s (took %v)", name, time.Since(start)) } else { flog.Debug("Corporation name empty (took %v)", time.Since(start)) } return nil }) } if victim.AllianceID != 0 { g.Go(func() error { start := time.Now() flog.Debug("Fetching alliance name for ID %d", victim.AllianceID) name, err := getAllianceName(victim.AllianceID) if err != nil { flog.Debug("Alliance name fetch failed: %v", err) } flat.VictimAllianceName = name if name != "" { flog.Debug("Got alliance name: %s (took %v)", name, time.Since(start)) } else { flog.Debug("Alliance name empty (took %v)", time.Since(start)) } return nil }) } g.Go(func() error { start := time.Now() flog.Debug("Fetching ship type name for ID %d", victim.ShipTypeID) typeName, err := flattenTypeName(ctx, db, int32(victim.ShipTypeID)) if err != nil { return err } flat.VictimShipTypeName = typeName flog.Debug("Got ship type name: %s (took %v)", typeName, time.Since(start)) return nil }) g.Go(func() error { start := time.Now() flog.Debug("Fetching ship group name for type ID %d", victim.ShipTypeID) groupName, err := flattenGroupName(ctx, db, int32(victim.ShipTypeID)) if err != nil { return err } flat.VictimShipGroupName = groupName flog.Debug("Got ship group name: %s (took %v)", groupName, time.Since(start)) return nil }) g.Go(func() error { start := time.Now() flog.Debug("Fetching ship category name for type ID %d", victim.ShipTypeID) categoryName, err := flattenCategoryName(ctx, db, int32(victim.ShipTypeID)) if err != nil { return err } flat.VictimShipCategoryName = categoryName flog.Debug("Got ship category name: %s (took %v)", categoryName, time.Since(start)) return nil }) if err := g.Wait(); err != nil { flog.Error("Failed to flatten victim: %v", err) return err } return nil } func flattenAttacker(ctx context.Context, db DB, killmailID int64, attacker Attacker) (*FlatKillmailAttacker, error) { flog := logger.Default.WithPrefix("flattenAttacker").WithPrefix(fmt.Sprintf("character_%d", attacker.CharacterID)) flog.Debug("Starting attacker flattening") flat := &FlatKillmailAttacker{ KillmailID: killmailID, CharacterID: attacker.CharacterID, CorporationID: attacker.CorporationID, ShipTypeID: int32(attacker.ShipTypeID), WeaponTypeID: int32(attacker.WeaponTypeID), DamageDone: attacker.DamageDone, FinalBlow: attacker.FinalBlow, SecurityStatus: float32(attacker.SecurityStatus), } if attacker.AllianceID != 0 { flat.AllianceID = &attacker.AllianceID } g, ctx := errgroup.WithContext(ctx) if attacker.CharacterID != 0 { g.Go(func() error { start := time.Now() flog.Debug("Fetching character name") name, err := getCharacterName(attacker.CharacterID) if err != nil { flog.Debug("Character name fetch failed: %v", err) } flat.CharacterName = name if name != "" { flog.Debug("Got character name: %s (took %v)", name, time.Since(start)) } else { flog.Debug("Character name empty (took %v)", time.Since(start)) } return nil }) } if attacker.CorporationID != 0 { g.Go(func() error { start := time.Now() flog.Debug("Fetching corporation name") name, err := getCorporationName(attacker.CorporationID) if err != nil { flog.Debug("Corporation name fetch failed: %v", err) } flat.CorporationName = name if name != "" { flog.Debug("Got corporation name: %s (took %v)", name, time.Since(start)) } else { flog.Debug("Corporation name empty (took %v)", time.Since(start)) } return nil }) } if attacker.AllianceID != 0 { g.Go(func() error { start := time.Now() flog.Debug("Fetching alliance name") name, err := getAllianceName(attacker.AllianceID) if err != nil { flog.Debug("Alliance name fetch failed: %v", err) } flat.AllianceName = name if name != "" { flog.Debug("Got alliance name: %s (took %v)", name, time.Since(start)) } else { flog.Debug("Alliance name empty (took %v)", time.Since(start)) } return nil }) } g.Go(func() error { start := time.Now() flog.Debug("Fetching ship type name for ID %d", attacker.ShipTypeID) typeName, err := flattenTypeName(ctx, db, int32(attacker.ShipTypeID)) if err != nil { return err } flat.ShipTypeName = typeName flog.Debug("Got ship type name: %s (took %v)", typeName, time.Since(start)) return nil }) g.Go(func() error { start := time.Now() flog.Debug("Fetching ship group name for type ID %d", attacker.ShipTypeID) groupName, err := flattenGroupName(ctx, db, int32(attacker.ShipTypeID)) if err != nil { return err } flat.ShipGroupName = groupName flog.Debug("Got ship group name: %s (took %v)", groupName, time.Since(start)) return nil }) if attacker.WeaponTypeID != 0 { g.Go(func() error { start := time.Now() flog.Debug("Fetching weapon type name for ID %d", attacker.WeaponTypeID) typeName, err := flattenTypeName(ctx, db, int32(attacker.WeaponTypeID)) if err != nil { return err } flat.WeaponTypeName = typeName flog.Debug("Got weapon type name: %s (took %v)", typeName, time.Since(start)) return nil }) } if err := g.Wait(); err != nil { flog.Error("Failed to flatten attacker: %v", err) return nil, err } return flat, nil } func flattenItemType(ctx context.Context, db DB, killmailID int64, item Item) (*FlatKillmailItem, error) { flog := logger.Default.WithPrefix("flattenItemType").WithPrefix(fmt.Sprintf("item_%d", item.ItemTypeID)) flog.Debug("Starting item flattening") flat := &FlatKillmailItem{ KillmailID: killmailID, ItemTypeID: int32(item.ItemTypeID), Flag: int32(item.Flag), QuantityDestroyed: derefInt64(item.QuantityDestroyed), QuantityDropped: derefInt64(item.QuantityDropped), Singleton: int32(item.Singleton), } g, ctx := errgroup.WithContext(ctx) g.Go(func() error { start := time.Now() flog.Debug("Fetching item type name") typeName, err := flattenTypeName(ctx, db, int32(item.ItemTypeID)) if err != nil { return err } flat.ItemTypeName = typeName flog.Debug("Got item type name: %s (took %v)", typeName, time.Since(start)) return nil }) g.Go(func() error { start := time.Now() flog.Debug("Fetching item group name") groupName, err := flattenGroupName(ctx, db, int32(item.ItemTypeID)) if err != nil { return err } flat.ItemGroupName = groupName flog.Debug("Got item group name: %s (took %v)", groupName, time.Since(start)) return nil }) g.Go(func() error { start := time.Now() flog.Debug("Fetching item category name") categoryName, err := flattenCategoryName(ctx, db, int32(item.ItemTypeID)) if err != nil { return err } flat.ItemCategoryName = categoryName flog.Debug("Got item category name: %s (took %v)", categoryName, time.Since(start)) return nil }) g.Go(func() error { start := time.Now() flog.Debug("Fetching item market group name") marketGroupName, err := flattenMarketGroupName(ctx, db, int32(item.ItemTypeID)) if err != nil { return err } flat.ItemMarketGroupName = marketGroupName flog.Debug("Got item market group name: %s (took %v)", marketGroupName, time.Since(start)) return nil }) if err := g.Wait(); err != nil { flog.Error("Failed to flatten item: %v", err) return nil, err } return flat, nil } func flattenTypeName(ctx context.Context, db DB, typeID int32) (string, error) { flog := logger.Default.WithPrefix("flattenTypeName").WithPrefix(fmt.Sprintf("type_%d", typeID)) flog.Debug("Fetching type name") itemType, err := globalFlatCache.types.Get(ctx, db, typeID) if err != nil { return "", err } flog.Debug("Got type name: %s", itemType.TypeName) return itemType.TypeName, nil } func flattenGroupName(ctx context.Context, db DB, typeID int32) (string, error) { flog := logger.Default.WithPrefix("flattenGroupName").WithPrefix(fmt.Sprintf("type_%d", typeID)) flog.Debug("Fetching group name") itemType, err := globalFlatCache.types.Get(ctx, db, typeID) if err != nil { return "", err } group, err := globalFlatCache.groups.Get(ctx, db, itemType.GroupID) if err != nil { return "", err } flog.Debug("Got group name: %s", group.GroupName) return group.GroupName, nil } func flattenCategoryName(ctx context.Context, db DB, typeID int32) (string, error) { flog := logger.Default.WithPrefix("flattenCategoryName").WithPrefix(fmt.Sprintf("type_%d", typeID)) flog.Debug("Fetching category name") itemType, err := globalFlatCache.types.Get(ctx, db, typeID) if err != nil { return "", err } group, err := globalFlatCache.groups.Get(ctx, db, itemType.GroupID) if err != nil { return "", err } category, err := globalFlatCache.categories.Get(ctx, db, group.CategoryID) if err != nil { return "", err } flog.Debug("Got category name: %s", category.CategoryName) return category.CategoryName, nil } func flattenMarketGroupName(ctx context.Context, db DB, typeID int32) (string, error) { flog := logger.Default.WithPrefix("flattenMarketGroupName").WithPrefix(fmt.Sprintf("type_%d", typeID)) flog.Debug("Fetching market group name") itemType, err := globalFlatCache.types.Get(ctx, db, typeID) if err != nil { return "", err } if itemType.MarketGroupID == 0 { flog.Debug("Type has no market group") return "", nil } marketGroup, err := globalFlatCache.marketGroups.Get(ctx, db, itemType.MarketGroupID) if err != nil { return "", err } flog.Debug("Got market group name: %s", marketGroup.MarketGroupName) return marketGroup.MarketGroupName, nil } type namedEntity interface { GetName() string } func (c Character) GetName() string { return c.Name } func (c Corporation) GetName() string { return c.Name } func (a Alliance) GetName() string { return a.Name } var ( flattenerConsumer *nsq.Consumer flattenerMaxInFlight int flattenerPaused bool flattenerPauseMu sync.Mutex // Memoized ESI name lookup functions getCharacterNameMemo func(int64) (string, error) getCorporationNameMemo func(int64) (string, error) getAllianceNameMemo func(int64) (string, error) ) func init() { // Use MemoizedBloom to only cache names that appear multiple times // Capacity: 100k unique IDs, false positive rate: 1% getCharacterNameMemo = utils.MemoizedBloom(getCharacterNameImpl, 100000, 0.01).(func(int64) (string, error)) getCorporationNameMemo = utils.MemoizedBloom(getCorporationNameImpl, 100000, 0.01).(func(int64) (string, error)) getAllianceNameMemo = utils.MemoizedBloom(getAllianceNameImpl, 50000, 0.01).(func(int64) (string, error)) } func pauseConsumer() { flattenerPauseMu.Lock() defer flattenerPauseMu.Unlock() // If already paused, do nothing if flattenerPaused { return } // Pause the consumer if flattenerConsumer != nil { flattenerConsumer.ChangeMaxInFlight(0) flattenerPaused = true logger.Default.Info("Paused NSQ consumer due to rate limit for 15 minutes") // Resume after 15 minutes go func() { time.Sleep(15 * time.Minute) flattenerPauseMu.Lock() defer flattenerPauseMu.Unlock() if flattenerConsumer != nil { flattenerConsumer.ChangeMaxInFlight(flattenerMaxInFlight) flattenerPaused = false logger.Default.Info("Resumed NSQ consumer after rate limit pause") } }() } } func getName[T namedEntity](entityType, cachePrefix string, entityID int64) (string, error) { flog := logger.Default.WithPrefix(fmt.Sprintf("get%sName", entityType)).WithPrefix(fmt.Sprintf("%s_%d", cachePrefix, entityID)) esiURL := fmt.Sprintf("https://esi.evetech.net/%s/%d", cachePrefix, entityID) proxyURL := fmt.Sprintf("https://proxy.site.quack-lab.dev?url=%s", esiURL) flog.Debug("Fetching %s name from ESI", entityType) flog.Debug("ESI URL: %s", esiURL) flog.Debug("Proxy URL: %s", proxyURL) resp, err := http.Get(proxyURL) if err != nil { flog.Debug("%s request failed: %v", entityType, err) return "", err } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { body, err := io.ReadAll(resp.Body) if err != nil { flog.Debug("Failed to read response body: %v", err) return "", err } var entity T if err := json.Unmarshal(body, &entity); err != nil { flog.Debug("Failed to unmarshal response: %v", err) return "", err } flog.Debug("Successfully got %s name: %s", entityType, entity.GetName()) return entity.GetName(), nil } if resp.StatusCode == http.StatusNotFound { io.Copy(io.Discard, resp.Body) flog.Debug("%s not found (404)", entityType) return "", nil } if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == 429 || resp.StatusCode >= 500 { io.Copy(io.Discard, resp.Body) flog.Debug("Rate limited or server error (status %d), pausing consumer", resp.StatusCode) pauseConsumer() return "", nil } io.Copy(io.Discard, resp.Body) flog.Debug("%s request failed with status %d", entityType, resp.StatusCode) return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) } func getCharacterName(characterID int64) (string, error) { return getCharacterNameMemo(characterID) } func getCorporationName(corporationID int64) (string, error) { return getCorporationNameMemo(corporationID) } func getAllianceName(allianceID int64) (string, error) { return getAllianceNameMemo(allianceID) } func getCharacterNameImpl(characterID int64) (string, error) { return getName[Character]("Character", "character", characterID) } func getCorporationNameImpl(corporationID int64) (string, error) { return getName[Corporation]("Corporation", "corporation", corporationID) } func getAllianceNameImpl(allianceID int64) (string, error) { return getName[Alliance]("Alliance", "alliance", allianceID) }