Get rid of broken nsq batching
This commit is contained in:
@@ -36,7 +36,6 @@ var (
|
||||
stage1Workers int
|
||||
stage2Workers int
|
||||
stage3Workers int
|
||||
nsqBatchSize int
|
||||
|
||||
serverPort string
|
||||
serverMode bool
|
||||
@@ -60,7 +59,6 @@ func initConfig() error {
|
||||
stage1Workers = getEnvInt("STAGE1_WORKERS", 6)
|
||||
stage2Workers = getEnvInt("STAGE2_WORKERS", 4)
|
||||
stage3Workers = getEnvInt("STAGE3_WORKERS", 4)
|
||||
nsqBatchSize = getEnvInt("NSQ_BATCH_SIZE", 5000)
|
||||
|
||||
serverPort = getEnv("SERVER_PORT", "3000")
|
||||
serverMode = getEnv("SERVER", "false") == "true"
|
||||
|
||||
@@ -19,13 +19,6 @@ import (
|
||||
func runFileReaderStage() {
|
||||
logger.Info("Starting file reader stage")
|
||||
|
||||
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), nsq.NewConfig())
|
||||
if err != nil {
|
||||
logger.Error("Error creating producer: %v", err)
|
||||
return
|
||||
}
|
||||
defer producer.Stop()
|
||||
|
||||
killmailFiles, err := os.ReadDir("data")
|
||||
if err != nil {
|
||||
logger.Error("Failed to read data directory: %v", err)
|
||||
@@ -64,12 +57,22 @@ func runFileReaderStage() {
|
||||
WithPrefix(filepath.Base(filePath))
|
||||
fileLog.Info("Processing file")
|
||||
|
||||
config := nsq.NewConfig()
|
||||
config.WriteTimeout = 10 * time.Second
|
||||
config.DialTimeout = 5 * time.Second
|
||||
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%d", nsqHost, nsqPort), config)
|
||||
if err != nil {
|
||||
fileLog.Error("Error creating producer: %v", err)
|
||||
return
|
||||
}
|
||||
defer producer.Stop()
|
||||
|
||||
handler := &FileReaderHandler{
|
||||
producer: producer,
|
||||
workerID: worker,
|
||||
}
|
||||
|
||||
err := handler.processFile(ctx, filePath)
|
||||
err = handler.processFile(ctx, filePath)
|
||||
if err != nil {
|
||||
fileLog.Error("Failed to process file: %v", err)
|
||||
return
|
||||
@@ -98,44 +101,33 @@ func (h *FileReaderHandler) processFile(ctx context.Context, filePath string) er
|
||||
|
||||
messagelog.Info("Loaded %d killmails, publishing to NSQ", len(killmails))
|
||||
|
||||
for i := 0; i < len(killmails); i += nsqBatchSize {
|
||||
published := 0
|
||||
for i, killmail := range killmails {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
end := i + nsqBatchSize
|
||||
if end > len(killmails) {
|
||||
end = len(killmails)
|
||||
}
|
||||
|
||||
batch := killmails[i:end]
|
||||
messages := make([][]byte, 0, len(batch))
|
||||
|
||||
for _, killmail := range batch {
|
||||
killmailBytes, err := json.Marshal(killmail)
|
||||
if err != nil {
|
||||
messagelog.Error("Failed to marshal killmail: %v", err)
|
||||
continue
|
||||
}
|
||||
messages = append(messages, killmailBytes)
|
||||
}
|
||||
|
||||
if len(messages) == 0 {
|
||||
killmailBytes, err := json.Marshal(killmail)
|
||||
if err != nil {
|
||||
messagelog.Error("Failed to marshal killmail: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
err = h.producer.MultiPublish("killmail-queue", messages)
|
||||
err = h.producer.Publish("killmail-queue", killmailBytes)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
messagelog.Error("Failed to publish batch, retrying: %v", err)
|
||||
messagelog.Error("Failed to publish killmail, retrying: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
messagelog.Info("Published batch of %d killmails (%d/%d)", len(messages), end, len(killmails))
|
||||
published++
|
||||
if published%1000 == 0 {
|
||||
messagelog.Info("Published %d killmails (%d/%d)", published, i+1, len(killmails))
|
||||
}
|
||||
}
|
||||
|
||||
messagelog.Info("Published %d killmails", len(killmails))
|
||||
messagelog.Info("Published %d killmails", published)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user