package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "sync" "time" logger "git.site.quack-lab.dev/dave/cylogger" ) const ( batchSize = 1000 timeout = 5 * time.Second ) var ( batchBuffer = make([]NSQMessage, 0, batchSize) batchMutex sync.Mutex ) var whitelistedAchievements = map[string]bool{ "15": true, "958": true, "1276": true, "2088": true, "2151": true, "5466": true, "5759": true, "6470": true, "6763": true, "7392": true, "7393": true, "7394": true, "7958": true, "8939": true, "8992": true, "9048": true, "94103": true, "10059": true, "10079": true, "10278": true, "10657": true, "10672": true, "10684": true, "10688": true, "10689": true, "10692": true, "10693": true, "10698": true, "10790": true, "10875": true, "11124": true, "11126": true, "11127": true, "11128": true, "11157": true, "11164": true, "11188": true, "11189": true, "11190": true, "11446": true, "11473": true, "11610": true, "11674": true, "11992": true, "11993": true, "11994": true, "11995": true, "11996": true, "11997": true, "11998": true, "11999": true, "12000": true, "12001": true, "12026": true, "12074": true, "12445": true, "12447": true, "12448": true, } func Save(message NSQMessage) error { _, ok := whitelistedAchievements[message.ID] if !ok { logger.Debug("Received message for non-whitelisted achievement %s", message.ID) return nil } batchMutex.Lock() batchBuffer = append(batchBuffer, message) currentBatchSize := len(batchBuffer) batchMutex.Unlock() logger.Debug("Added achievement to batch. Current batch size: %d/%d", currentBatchSize, batchSize) if currentBatchSize >= batchSize { batchMutex.Lock() batch := make([]NSQMessage, len(batchBuffer)) copy(batch, batchBuffer) batchBuffer = batchBuffer[:0] batchMutex.Unlock() logger.Info("Batch size reached %d, sending batch to %s", len(batch), backendEndpoint) return sendBatch(batch) } return nil } // Flush sends any remaining achievements in the buffer func Flush() error { batchMutex.Lock() if len(batchBuffer) == 0 { batchMutex.Unlock() return nil } batch := make([]NSQMessage, len(batchBuffer)) copy(batch, batchBuffer) batchBuffer = batchBuffer[:0] batchMutex.Unlock() logger.Info("Flushing final batch of %d achievements to %s", len(batch), backendEndpoint) return sendBatch(batch) } func sendBatch(batch []NSQMessage) error { logger.Debug("Preparing to send batch of %d achievements", len(batch)) data, err := json.Marshal(batch) if err != nil { logger.Error("Failed to marshal batch: %v", err) return fmt.Errorf("error marshaling batch: %v", err) } logger.Debug("Successfully marshaled batch to JSON, size: %d bytes", len(data)) client := &http.Client{ Timeout: timeout, } req, err := http.NewRequest("POST", backendEndpoint, bytes.NewBuffer(data)) if err != nil { logger.Error("Failed to create HTTP request: %v", err) return fmt.Errorf("error creating request: %v", err) } req.Header.Set("Content-Type", "application/json") logger.Debug("Created HTTP request to %s", backendEndpoint) resp, err := client.Do(req) if err != nil { logger.Error("Failed to send HTTP request: %v", err) return fmt.Errorf("error sending batch: %v", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { logger.Error("Failed to read response body: %v", err) } if resp.StatusCode != http.StatusOK { logger.Error("Received non-OK status code: %d, body: %s", resp.StatusCode, string(body)) return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body)) } logger.Info("Successfully sent batch of %d achievements", len(batch)) return nil }