Files
zkill-susser/file_reader.go

136 lines
3.1 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
logger "git.site.quack-lab.dev/dave/cylogger"
utils "git.site.quack-lab.dev/dave/cyutils"
)
func runFileReaderStage() {
logger.Info("Starting file reader stage")
httpClient := &http.Client{
Timeout: 30 * time.Second,
}
killmailFiles, err := os.ReadDir("data")
if err != nil {
logger.Error("Failed to read data directory: %v", err)
return
}
var filesToProcess []string
for _, file := range killmailFiles {
if strings.HasSuffix(file.Name(), ".bz2") {
filesToProcess = append(filesToProcess, filepath.Join("data", file.Name()))
}
}
logger.Info("Found %d files to process", len(filesToProcess))
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-sigChan
logger.Info("Received signal to terminate. Initiating graceful shutdown...")
cancel()
}()
utils.WithWorkers(stage1Workers, filesToProcess, func(worker int, index int, filePath string) {
if ctx.Err() != nil {
return
}
fileLog := logger.Default.
WithPrefix(fmt.Sprintf("worker %d", worker)).
WithPrefix(fmt.Sprintf("file %d of %d", index+1, len(filesToProcess))).
WithPrefix(filepath.Base(filePath))
fileLog.Info("Processing file")
handler := &FileReaderHandler{
httpClient: httpClient,
workerID: worker,
}
err := handler.processFile(ctx, filePath)
if err != nil {
fileLog.Error("Failed to process file: %v", err)
return
}
fileLog.Info("Finished processing file")
})
logger.Info("File reader stage completed")
}
type FileReaderHandler struct {
httpClient *http.Client
workerID int
}
func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) error {
messagelog := logger.Default.WithPrefix(fmt.Sprintf("worker_%d", h.workerID)).WithPrefix(filepath.Base(filePath))
messagelog.Info("Loading killmails from %s", filePath)
killmails, err := LoadBz2Killmails(filePath)
if err != nil {
messagelog.Error("Failed to load killmails: %v", err)
return err
}
messagelog.Info("Loaded %d killmails, publishing to NSQ", len(killmails))
for _, killmail := range killmails {
if ctx.Err() != nil {
return ctx.Err()
}
killmailBytes, err := json.Marshal(killmail)
if err != nil {
messagelog.Error("Failed to marshal killmail: %v", err)
continue
}
for {
url := fmt.Sprintf("https://%s/pub?topic=killmail-queue", nsqHost)
req, err := http.NewRequest("POST", url, bytes.NewReader(killmailBytes))
if err != nil {
messagelog.Error("Failed to create request, retrying: %v", err)
time.Sleep(1 * time.Second)
continue
}
req.Header.Set("Content-Type", "application/json")
resp, err := h.httpClient.Do(req)
if err == nil && resp.StatusCode == 200 {
resp.Body.Close()
break
}
if resp != nil {
resp.Body.Close()
}
messagelog.Error("Failed to publish killmail, retrying: %v", err)
time.Sleep(1 * time.Second)
}
}
messagelog.Info("Published %d killmails", len(killmails))
return nil
}