diff --git a/main.go b/main.go index c4057fa..a4b7108 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/json" "flag" + "fmt" + "io" "net/http" "os" "path/filepath" @@ -20,14 +22,20 @@ import ( const nsqEndpoint = "https://nsq.site.quack-lab.dev/pub?topic=wowspy" var debug *bool -var messageQueue = make(chan string, 10000) var nsqWorkers = 32 +var allPlayersAchievementsGlobal = make(map[string][]NSQMessage) // PlayerName -> list of all their achievements +var allPlayerNamesGlobal = make(map[string]bool) // Set of all player names +var globalDataMutex = &sync.Mutex{} + func main() { root := flag.String("root", ".", "Root workdir") debug = flag.Bool("d", false, "Debug") flag.Parse() logger.InitFlag() + if *debug { + logger.SetLevel(logger.LevelDebug) // Assuming LevelDebug is the correct constant for cylogger + } logger.Info("Root: %q", *root) cleanedRoot := strings.Replace(*root, "~", os.Getenv("HOME"), 1) @@ -45,65 +53,126 @@ func main() { logger.Error("error matching Heimdall.lua: %v", err) return } - logger.Info("Found %d Heimdall.lua", len(matches)) - - wg := sync.WaitGroup{} - messages := make(chan NSQMessage, 1000) - for i := 0; i < nsqWorkers; i++ { - wg.Add(1) - go NsqWorker(&wg, messages) + logger.Info("Found %d Heimdall.lua files.", len(matches)) + if len(matches) == 0 { + logger.Info("No Heimdall.lua files found. Exiting.") + return } + // Debugging + matches = matches[:2] + + // --- Pass 1: Extract all data --- + logger.Info("Starting Pass 1: Extracting data from all Heimdall.lua files...") + var wgPass1 sync.WaitGroup for _, match := range matches { - logger.Info("Processing %q", match) - wg.Add(1) - go ParseHeimdallFile(filepath.Join(cleanedRoot, match), &wg, messages) + wgPass1.Add(1) + go extractPlayerAchievementsFromFile(filepath.Join(cleanedRoot, match), &wgPass1) } - for msg := range messages { - logger.Info("Message: %#v", msg) + wgPass1.Wait() + logger.Info("Finished Pass 1: Extracted data for %d unique players globally from %d files.", len(allPlayerNamesGlobal), len(matches)) + if *debug { + globalDataMutex.Lock() + logger.Debug("Total achievements collected globally: %d", countTotalAchievements(allPlayersAchievementsGlobal)) + globalDataMutex.Unlock() } - wg.Wait() - close(messages) - return + + // --- Process and Send to NSQ --- + logger.Info("Starting NSQ message publishing...") + nsqMessagesChan := make(chan NSQMessage, 10000) // Increased buffer size + var wgNsqWorkers sync.WaitGroup + for i := 0; i < nsqWorkers; i++ { + wgNsqWorkers.Add(1) + go NsqWorker(&wgNsqWorkers, nsqMessagesChan) + } + + go func() { + globalDataMutex.Lock() + defer globalDataMutex.Unlock() + for playerName, achList := range allPlayersAchievementsGlobal { + for _, ach := range achList { + // ach.Name is already correctly set during extraction + nsqMessagesChan <- ach + logger.Debug("Queued NSQ message for Player: %s, AchID: %s", playerName, ach.ID) + } + } + close(nsqMessagesChan) // Close channel when all messages are sent + logger.Info("All NSQ messages queued.") + }() + + // --- Pass 2: Update Lua file states (in memory) --- + logger.Info("Starting Pass 2: Updating Lua states (setting alreadySeen and clearing players)...") + var wgPass2 sync.WaitGroup + if len(allPlayerNamesGlobal) > 0 { // Only run pass 2 if there are players to report + for _, match := range matches { + wgPass2.Add(1) + go updateLuaFileState(filepath.Join(cleanedRoot, match), &wgPass2, allPlayerNamesGlobal) + } + wgPass2.Wait() + logger.Info("Finished Pass 2: Lua states updated where applicable.") + } else { + logger.Info("Skipping Pass 2 as no players were found globally.") + } + + wgNsqWorkers.Wait() // Wait for all NSQ messages to be processed + logger.Info("All NSQ workers finished. Program complete.") } -func ParseHeimdallFile(path string, wg *sync.WaitGroup, messages chan NSQMessage) { +// Helper function to count total achievements for debugging +func countTotalAchievements(achMap map[string][]NSQMessage) int { + count := 0 + for _, achList := range achMap { + count += len(achList) + } + return count +} + +// extractPlayerAchievementsFromFile is for Pass 1 +func extractPlayerAchievementsFromFile(path string, wg *sync.WaitGroup) { + logger.Info("Extracting achievements from %q", path) defer wg.Done() L := lua.NewState() defer L.Close() if err := L.DoFile(path); err != nil { - logger.Error("error executing Lua file %q: %v", path, err) + logger.Error("Pass 1: error executing Lua file %q: %v", path, err) return } heimdallAchievements := L.GetGlobal("Heimdall_Achievements") if heimdallAchievements.Type() == lua.LTNil { - logger.Error("Heimdall_Achievements not found in %q", path) + logger.Warning("Pass 1: Heimdall_Achievements not found in %q. Skipping file.", path) return } playersTableLua := L.GetField(heimdallAchievements, "players") + if playersTableLua.Type() == lua.LTNil { + logger.Info("Pass 1: 'players' table is nil in Heimdall_Achievements in %q. No player data to extract.", path) + return + } playersTable, ok := playersTableLua.(*lua.LTable) - if !ok || playersTableLua.Type() == lua.LTNil { - logger.Error("'players' table not found or not a table in Heimdall_Achievements in %q", path) + if !ok { + logger.Warning("Pass 1: 'players' field in Heimdall_Achievements is not a table in %q (type: %s). Skipping.", path, playersTableLua.Type().String()) return } + var filePlayerAchievements []NSQMessage // Temporary list for this file's achievements + var filePlayerNames = make(map[string]bool) // Temporary set for this file's player names + playersTable.ForEach(func(playerNameLua lua.LValue, playerAchievementsLua lua.LValue) { currentPlayerName := playerNameLua.String() - logger.Info("Processing player: %s in %q", currentPlayerName, path) + filePlayerNames[currentPlayerName] = true // Track name - achievementsTable, ok := playerAchievementsLua.(*lua.LTable) + achievementsTableLua, ok := playerAchievementsLua.(*lua.LTable) if !ok { - logger.Error("Achievements for player %s is not a table in %q", currentPlayerName, path) + logger.Error("Pass 1: Achievements for player %s is not a table in %q. Skipping achievements for this player.", currentPlayerName, path) return } - achievementsTable.ForEach(func(_ lua.LValue, achievementDataLua lua.LValue) { + achievementsTableLua.ForEach(func(_ lua.LValue, achievementDataLua lua.LValue) { achievementTable, ok := achievementDataLua.(*lua.LTable) if !ok { - logger.Error("Achievement data for player %s is not a table in %q", currentPlayerName, path) + logger.Error("Pass 1: Achievement data for player %s is not a table in %q. Skipping this achievement.", currentPlayerName, path) return } @@ -115,39 +184,89 @@ func ParseHeimdallFile(path string, wg *sync.WaitGroup, messages chan NSQMessage } else if idVal.Type() == lua.LTString { currentAchievement.ID = idVal.String() } else { - logger.Warning("Missing or invalid 'id' (expected number or string) for achievement for player %s in %q", currentPlayerName, path) + logger.Warning("Pass 1: Missing or invalid 'id' (expected number or string) for achievement for player %s in %q.", currentPlayerName, path) } dateVal := achievementTable.RawGetString("date") if dateVal.Type() == lua.LTString { currentAchievement.Date = dateVal.String() } else { - logger.Warning("Missing or invalid 'date' (expected string) for achievement for player %s in %q", currentPlayerName, path) + logger.Warning("Pass 1: Missing or invalid 'date' (expected string) for achievement for player %s in %q.", currentPlayerName, path) } completedVal := achievementTable.RawGetString("completed") if completedVal.Type() == lua.LTBool { currentAchievement.Completed = lua.LVAsBool(completedVal) } else { - logger.Warning("Missing or invalid 'completed' (expected boolean) for achievement for player %s in %q", currentPlayerName, path) + logger.Warning("Pass 1: Missing or invalid 'completed' (expected boolean) for achievement for player %s in %q.", currentPlayerName, path) } - if currentAchievement.ID != "" { // Ensure we have at least an ID before sending - logger.Info("Publishing achievement for %s: ID %s, Date: %s, Completed: %t", currentPlayerName, currentAchievement.ID, currentAchievement.Date, currentAchievement.Completed) - messages <- currentAchievement + if currentAchievement.ID != "" { // Ensure we have at least an ID before adding + filePlayerAchievements = append(filePlayerAchievements, currentAchievement) } }) }) + + if len(filePlayerAchievements) > 0 || len(filePlayerNames) > 0 { + globalDataMutex.Lock() + for _, ach := range filePlayerAchievements { + allPlayersAchievementsGlobal[ach.Name] = append(allPlayersAchievementsGlobal[ach.Name], ach) + } + for name := range filePlayerNames { + allPlayerNamesGlobal[name] = true + } + globalDataMutex.Unlock() + logger.Info("Pass 1: Extracted from %q. Players in file: %d. Achievements in file: %d.", path, len(filePlayerNames), len(filePlayerAchievements)) + } else { + logger.Info("Pass 1: No player data or names extracted from %q.", path) + } } -func NsqWorker(wg *sync.WaitGroup, messages chan NSQMessage) { +// updateLuaFileState is for Pass 2 +func updateLuaFileState(path string, wg *sync.WaitGroup, allKnownPlayerNames map[string]bool) { + defer wg.Done() + L := lua.NewState() + defer L.Close() + + if err := L.DoFile(path); err != nil { + logger.Error("Pass 2: error executing Lua file %q: %v. Cannot update its state.", path, err) + return + } + + heimdallAchievementsVal := L.GetGlobal("Heimdall_Achievements") + if heimdallAchievementsVal.Type() == lua.LTNil { + logger.Warning("Pass 2: Heimdall_Achievements not found in %q after script execution. Cannot set 'alreadySeen' or clear 'players'.", path) + return + } + + heimdallAchievementsTable, ok := heimdallAchievementsVal.(*lua.LTable) + if !ok { + logger.Warning("Pass 2: Heimdall_Achievements in %q is not a table (type: %s). Cannot update.", path, heimdallAchievementsVal.Type().String()) + return + } + + luaAlreadySeen := L.NewTable() + for name := range allKnownPlayerNames { + luaAlreadySeen.RawSetString(name, lua.LTrue) + } + + L.SetField(heimdallAchievementsTable, "alreadySeen", luaAlreadySeen) + logger.Debug("Pass 2: Set Heimdall_Achievements.alreadySeen for %q with %d total player names.", path, len(allKnownPlayerNames)) + + L.SetField(heimdallAchievementsTable, "players", L.NewTable()) + logger.Debug("Pass 2: Cleared Heimdall_Achievements.players for %q.", path) +} + +func NsqWorker(wg *sync.WaitGroup, messages <-chan NSQMessage) { // Changed to read-only channel defer wg.Done() for msg := range messages { err := Publish(msg) if err != nil { - logger.Warning("error publishing message: %v", err) + logger.Warning("error publishing message for player %s, achievement ID %s: %v", msg.Name, msg.ID, err) + // Optionally, add retry logic or dead-letter queue here continue } + logger.Debug("Successfully published achievement for %s: ID %s", msg.Name, msg.ID) } } @@ -155,11 +274,21 @@ func Publish(msg NSQMessage) error { data := bytes.Buffer{} err := json.NewEncoder(&data).Encode(msg) if err != nil { - return err + return err // Error encoding JSON } - _, err = http.Post(nsqEndpoint, "application/json", &data) + resp, err := http.Post(nsqEndpoint, "application/json", &data) if err != nil { - return err + return err // Error making HTTP POST request + } + defer resp.Body.Close() // Ensure body is closed + + if resp.StatusCode != http.StatusOK { + // Read body for more details if not OK + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + logger.Warning("Failed to read error response body from NSQ: %v", readErr) + } + return fmt.Errorf("nsq publish failed: status code %d, body: %s", resp.StatusCode, string(bodyBytes)) } return nil }