Refactor main.go to enhance data extraction and NSQ message handling; added debug logging and improved error handling.
This commit is contained in:
203
main.go
203
main.go
@@ -4,6 +4,8 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -20,14 +22,20 @@ import (
|
|||||||
const nsqEndpoint = "https://nsq.site.quack-lab.dev/pub?topic=wowspy"
|
const nsqEndpoint = "https://nsq.site.quack-lab.dev/pub?topic=wowspy"
|
||||||
|
|
||||||
var debug *bool
|
var debug *bool
|
||||||
var messageQueue = make(chan string, 10000)
|
|
||||||
var nsqWorkers = 32
|
var nsqWorkers = 32
|
||||||
|
|
||||||
|
var allPlayersAchievementsGlobal = make(map[string][]NSQMessage) // PlayerName -> list of all their achievements
|
||||||
|
var allPlayerNamesGlobal = make(map[string]bool) // Set of all player names
|
||||||
|
var globalDataMutex = &sync.Mutex{}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
root := flag.String("root", ".", "Root workdir")
|
root := flag.String("root", ".", "Root workdir")
|
||||||
debug = flag.Bool("d", false, "Debug")
|
debug = flag.Bool("d", false, "Debug")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
logger.InitFlag()
|
logger.InitFlag()
|
||||||
|
if *debug {
|
||||||
|
logger.SetLevel(logger.LevelDebug) // Assuming LevelDebug is the correct constant for cylogger
|
||||||
|
}
|
||||||
|
|
||||||
logger.Info("Root: %q", *root)
|
logger.Info("Root: %q", *root)
|
||||||
cleanedRoot := strings.Replace(*root, "~", os.Getenv("HOME"), 1)
|
cleanedRoot := strings.Replace(*root, "~", os.Getenv("HOME"), 1)
|
||||||
@@ -45,65 +53,126 @@ func main() {
|
|||||||
logger.Error("error matching Heimdall.lua: %v", err)
|
logger.Error("error matching Heimdall.lua: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Info("Found %d Heimdall.lua", len(matches))
|
logger.Info("Found %d Heimdall.lua files.", len(matches))
|
||||||
|
if len(matches) == 0 {
|
||||||
wg := sync.WaitGroup{}
|
logger.Info("No Heimdall.lua files found. Exiting.")
|
||||||
messages := make(chan NSQMessage, 1000)
|
return
|
||||||
for i := 0; i < nsqWorkers; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go NsqWorker(&wg, messages)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Debugging
|
||||||
|
matches = matches[:2]
|
||||||
|
|
||||||
|
// --- Pass 1: Extract all data ---
|
||||||
|
logger.Info("Starting Pass 1: Extracting data from all Heimdall.lua files...")
|
||||||
|
var wgPass1 sync.WaitGroup
|
||||||
for _, match := range matches {
|
for _, match := range matches {
|
||||||
logger.Info("Processing %q", match)
|
wgPass1.Add(1)
|
||||||
wg.Add(1)
|
go extractPlayerAchievementsFromFile(filepath.Join(cleanedRoot, match), &wgPass1)
|
||||||
go ParseHeimdallFile(filepath.Join(cleanedRoot, match), &wg, messages)
|
|
||||||
}
|
}
|
||||||
for msg := range messages {
|
wgPass1.Wait()
|
||||||
logger.Info("Message: %#v", msg)
|
logger.Info("Finished Pass 1: Extracted data for %d unique players globally from %d files.", len(allPlayerNamesGlobal), len(matches))
|
||||||
|
if *debug {
|
||||||
|
globalDataMutex.Lock()
|
||||||
|
logger.Debug("Total achievements collected globally: %d", countTotalAchievements(allPlayersAchievementsGlobal))
|
||||||
|
globalDataMutex.Unlock()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
close(messages)
|
// --- Process and Send to NSQ ---
|
||||||
return
|
logger.Info("Starting NSQ message publishing...")
|
||||||
|
nsqMessagesChan := make(chan NSQMessage, 10000) // Increased buffer size
|
||||||
|
var wgNsqWorkers sync.WaitGroup
|
||||||
|
for i := 0; i < nsqWorkers; i++ {
|
||||||
|
wgNsqWorkers.Add(1)
|
||||||
|
go NsqWorker(&wgNsqWorkers, nsqMessagesChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
globalDataMutex.Lock()
|
||||||
|
defer globalDataMutex.Unlock()
|
||||||
|
for playerName, achList := range allPlayersAchievementsGlobal {
|
||||||
|
for _, ach := range achList {
|
||||||
|
// ach.Name is already correctly set during extraction
|
||||||
|
nsqMessagesChan <- ach
|
||||||
|
logger.Debug("Queued NSQ message for Player: %s, AchID: %s", playerName, ach.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(nsqMessagesChan) // Close channel when all messages are sent
|
||||||
|
logger.Info("All NSQ messages queued.")
|
||||||
|
}()
|
||||||
|
|
||||||
|
// --- Pass 2: Update Lua file states (in memory) ---
|
||||||
|
logger.Info("Starting Pass 2: Updating Lua states (setting alreadySeen and clearing players)...")
|
||||||
|
var wgPass2 sync.WaitGroup
|
||||||
|
if len(allPlayerNamesGlobal) > 0 { // Only run pass 2 if there are players to report
|
||||||
|
for _, match := range matches {
|
||||||
|
wgPass2.Add(1)
|
||||||
|
go updateLuaFileState(filepath.Join(cleanedRoot, match), &wgPass2, allPlayerNamesGlobal)
|
||||||
|
}
|
||||||
|
wgPass2.Wait()
|
||||||
|
logger.Info("Finished Pass 2: Lua states updated where applicable.")
|
||||||
|
} else {
|
||||||
|
logger.Info("Skipping Pass 2 as no players were found globally.")
|
||||||
|
}
|
||||||
|
|
||||||
|
wgNsqWorkers.Wait() // Wait for all NSQ messages to be processed
|
||||||
|
logger.Info("All NSQ workers finished. Program complete.")
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseHeimdallFile(path string, wg *sync.WaitGroup, messages chan NSQMessage) {
|
// Helper function to count total achievements for debugging
|
||||||
|
func countTotalAchievements(achMap map[string][]NSQMessage) int {
|
||||||
|
count := 0
|
||||||
|
for _, achList := range achMap {
|
||||||
|
count += len(achList)
|
||||||
|
}
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractPlayerAchievementsFromFile is for Pass 1
|
||||||
|
func extractPlayerAchievementsFromFile(path string, wg *sync.WaitGroup) {
|
||||||
|
logger.Info("Extracting achievements from %q", path)
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
L := lua.NewState()
|
L := lua.NewState()
|
||||||
defer L.Close()
|
defer L.Close()
|
||||||
|
|
||||||
if err := L.DoFile(path); err != nil {
|
if err := L.DoFile(path); err != nil {
|
||||||
logger.Error("error executing Lua file %q: %v", path, err)
|
logger.Error("Pass 1: error executing Lua file %q: %v", path, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
heimdallAchievements := L.GetGlobal("Heimdall_Achievements")
|
heimdallAchievements := L.GetGlobal("Heimdall_Achievements")
|
||||||
if heimdallAchievements.Type() == lua.LTNil {
|
if heimdallAchievements.Type() == lua.LTNil {
|
||||||
logger.Error("Heimdall_Achievements not found in %q", path)
|
logger.Warning("Pass 1: Heimdall_Achievements not found in %q. Skipping file.", path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
playersTableLua := L.GetField(heimdallAchievements, "players")
|
playersTableLua := L.GetField(heimdallAchievements, "players")
|
||||||
|
if playersTableLua.Type() == lua.LTNil {
|
||||||
|
logger.Info("Pass 1: 'players' table is nil in Heimdall_Achievements in %q. No player data to extract.", path)
|
||||||
|
return
|
||||||
|
}
|
||||||
playersTable, ok := playersTableLua.(*lua.LTable)
|
playersTable, ok := playersTableLua.(*lua.LTable)
|
||||||
if !ok || playersTableLua.Type() == lua.LTNil {
|
if !ok {
|
||||||
logger.Error("'players' table not found or not a table in Heimdall_Achievements in %q", path)
|
logger.Warning("Pass 1: 'players' field in Heimdall_Achievements is not a table in %q (type: %s). Skipping.", path, playersTableLua.Type().String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var filePlayerAchievements []NSQMessage // Temporary list for this file's achievements
|
||||||
|
var filePlayerNames = make(map[string]bool) // Temporary set for this file's player names
|
||||||
|
|
||||||
playersTable.ForEach(func(playerNameLua lua.LValue, playerAchievementsLua lua.LValue) {
|
playersTable.ForEach(func(playerNameLua lua.LValue, playerAchievementsLua lua.LValue) {
|
||||||
currentPlayerName := playerNameLua.String()
|
currentPlayerName := playerNameLua.String()
|
||||||
logger.Info("Processing player: %s in %q", currentPlayerName, path)
|
filePlayerNames[currentPlayerName] = true // Track name
|
||||||
|
|
||||||
achievementsTable, ok := playerAchievementsLua.(*lua.LTable)
|
achievementsTableLua, ok := playerAchievementsLua.(*lua.LTable)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Error("Achievements for player %s is not a table in %q", currentPlayerName, path)
|
logger.Error("Pass 1: Achievements for player %s is not a table in %q. Skipping achievements for this player.", currentPlayerName, path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
achievementsTable.ForEach(func(_ lua.LValue, achievementDataLua lua.LValue) {
|
achievementsTableLua.ForEach(func(_ lua.LValue, achievementDataLua lua.LValue) {
|
||||||
achievementTable, ok := achievementDataLua.(*lua.LTable)
|
achievementTable, ok := achievementDataLua.(*lua.LTable)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Error("Achievement data for player %s is not a table in %q", currentPlayerName, path)
|
logger.Error("Pass 1: Achievement data for player %s is not a table in %q. Skipping this achievement.", currentPlayerName, path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,39 +184,89 @@ func ParseHeimdallFile(path string, wg *sync.WaitGroup, messages chan NSQMessage
|
|||||||
} else if idVal.Type() == lua.LTString {
|
} else if idVal.Type() == lua.LTString {
|
||||||
currentAchievement.ID = idVal.String()
|
currentAchievement.ID = idVal.String()
|
||||||
} else {
|
} else {
|
||||||
logger.Warning("Missing or invalid 'id' (expected number or string) for achievement for player %s in %q", currentPlayerName, path)
|
logger.Warning("Pass 1: Missing or invalid 'id' (expected number or string) for achievement for player %s in %q.", currentPlayerName, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
dateVal := achievementTable.RawGetString("date")
|
dateVal := achievementTable.RawGetString("date")
|
||||||
if dateVal.Type() == lua.LTString {
|
if dateVal.Type() == lua.LTString {
|
||||||
currentAchievement.Date = dateVal.String()
|
currentAchievement.Date = dateVal.String()
|
||||||
} else {
|
} else {
|
||||||
logger.Warning("Missing or invalid 'date' (expected string) for achievement for player %s in %q", currentPlayerName, path)
|
logger.Warning("Pass 1: Missing or invalid 'date' (expected string) for achievement for player %s in %q.", currentPlayerName, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
completedVal := achievementTable.RawGetString("completed")
|
completedVal := achievementTable.RawGetString("completed")
|
||||||
if completedVal.Type() == lua.LTBool {
|
if completedVal.Type() == lua.LTBool {
|
||||||
currentAchievement.Completed = lua.LVAsBool(completedVal)
|
currentAchievement.Completed = lua.LVAsBool(completedVal)
|
||||||
} else {
|
} else {
|
||||||
logger.Warning("Missing or invalid 'completed' (expected boolean) for achievement for player %s in %q", currentPlayerName, path)
|
logger.Warning("Pass 1: Missing or invalid 'completed' (expected boolean) for achievement for player %s in %q.", currentPlayerName, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
if currentAchievement.ID != "" { // Ensure we have at least an ID before sending
|
if currentAchievement.ID != "" { // Ensure we have at least an ID before adding
|
||||||
logger.Info("Publishing achievement for %s: ID %s, Date: %s, Completed: %t", currentPlayerName, currentAchievement.ID, currentAchievement.Date, currentAchievement.Completed)
|
filePlayerAchievements = append(filePlayerAchievements, currentAchievement)
|
||||||
messages <- currentAchievement
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if len(filePlayerAchievements) > 0 || len(filePlayerNames) > 0 {
|
||||||
|
globalDataMutex.Lock()
|
||||||
|
for _, ach := range filePlayerAchievements {
|
||||||
|
allPlayersAchievementsGlobal[ach.Name] = append(allPlayersAchievementsGlobal[ach.Name], ach)
|
||||||
|
}
|
||||||
|
for name := range filePlayerNames {
|
||||||
|
allPlayerNamesGlobal[name] = true
|
||||||
|
}
|
||||||
|
globalDataMutex.Unlock()
|
||||||
|
logger.Info("Pass 1: Extracted from %q. Players in file: %d. Achievements in file: %d.", path, len(filePlayerNames), len(filePlayerAchievements))
|
||||||
|
} else {
|
||||||
|
logger.Info("Pass 1: No player data or names extracted from %q.", path)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NsqWorker(wg *sync.WaitGroup, messages chan NSQMessage) {
|
// updateLuaFileState is for Pass 2
|
||||||
|
func updateLuaFileState(path string, wg *sync.WaitGroup, allKnownPlayerNames map[string]bool) {
|
||||||
|
defer wg.Done()
|
||||||
|
L := lua.NewState()
|
||||||
|
defer L.Close()
|
||||||
|
|
||||||
|
if err := L.DoFile(path); err != nil {
|
||||||
|
logger.Error("Pass 2: error executing Lua file %q: %v. Cannot update its state.", path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
heimdallAchievementsVal := L.GetGlobal("Heimdall_Achievements")
|
||||||
|
if heimdallAchievementsVal.Type() == lua.LTNil {
|
||||||
|
logger.Warning("Pass 2: Heimdall_Achievements not found in %q after script execution. Cannot set 'alreadySeen' or clear 'players'.", path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
heimdallAchievementsTable, ok := heimdallAchievementsVal.(*lua.LTable)
|
||||||
|
if !ok {
|
||||||
|
logger.Warning("Pass 2: Heimdall_Achievements in %q is not a table (type: %s). Cannot update.", path, heimdallAchievementsVal.Type().String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
luaAlreadySeen := L.NewTable()
|
||||||
|
for name := range allKnownPlayerNames {
|
||||||
|
luaAlreadySeen.RawSetString(name, lua.LTrue)
|
||||||
|
}
|
||||||
|
|
||||||
|
L.SetField(heimdallAchievementsTable, "alreadySeen", luaAlreadySeen)
|
||||||
|
logger.Debug("Pass 2: Set Heimdall_Achievements.alreadySeen for %q with %d total player names.", path, len(allKnownPlayerNames))
|
||||||
|
|
||||||
|
L.SetField(heimdallAchievementsTable, "players", L.NewTable())
|
||||||
|
logger.Debug("Pass 2: Cleared Heimdall_Achievements.players for %q.", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NsqWorker(wg *sync.WaitGroup, messages <-chan NSQMessage) { // Changed to read-only channel
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for msg := range messages {
|
for msg := range messages {
|
||||||
err := Publish(msg)
|
err := Publish(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warning("error publishing message: %v", err)
|
logger.Warning("error publishing message for player %s, achievement ID %s: %v", msg.Name, msg.ID, err)
|
||||||
|
// Optionally, add retry logic or dead-letter queue here
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
logger.Debug("Successfully published achievement for %s: ID %s", msg.Name, msg.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,11 +274,21 @@ func Publish(msg NSQMessage) error {
|
|||||||
data := bytes.Buffer{}
|
data := bytes.Buffer{}
|
||||||
err := json.NewEncoder(&data).Encode(msg)
|
err := json.NewEncoder(&data).Encode(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err // Error encoding JSON
|
||||||
}
|
}
|
||||||
_, err = http.Post(nsqEndpoint, "application/json", &data)
|
resp, err := http.Post(nsqEndpoint, "application/json", &data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err // Error making HTTP POST request
|
||||||
|
}
|
||||||
|
defer resp.Body.Close() // Ensure body is closed
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
// Read body for more details if not OK
|
||||||
|
bodyBytes, readErr := io.ReadAll(resp.Body)
|
||||||
|
if readErr != nil {
|
||||||
|
logger.Warning("Failed to read error response body from NSQ: %v", readErr)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("nsq publish failed: status code %d, body: %s", resp.StatusCode, string(bodyBytes))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user