The point of this is (hopefully) some sort of resiliency I do not want to lose any messages ever And I want to be able to kill this process whenever it is misbehaving Hopefully this achieves that goal
		
			
				
	
	
		
			91 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			91 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package main
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"log"
 | 
						|
	"os"
 | 
						|
	"os/signal"
 | 
						|
	"syscall"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/nsqio/go-nsq"
 | 
						|
)
 | 
						|
 | 
						|
var Error *log.Logger
 | 
						|
var Warning *log.Logger
 | 
						|
 | 
						|
func init() {
 | 
						|
	log.SetFlags(log.Lmicroseconds | log.Lshortfile)
 | 
						|
	logFile, err := os.Create("main.log")
 | 
						|
	if err != nil {
 | 
						|
		log.Printf("Error creating log file: %v", err)
 | 
						|
		os.Exit(1)
 | 
						|
	}
 | 
						|
	logger := io.MultiWriter(os.Stdout, logFile)
 | 
						|
	log.SetOutput(logger)
 | 
						|
 | 
						|
	Error = log.New(io.MultiWriter(logFile, os.Stderr, os.Stdout),
 | 
						|
		fmt.Sprintf("%sERROR:%s ", "\033[0;101m", "\033[0m"),
 | 
						|
		log.Lmicroseconds|log.Lshortfile)
 | 
						|
	Warning = log.New(io.MultiWriter(logFile, os.Stdout),
 | 
						|
		fmt.Sprintf("%sWarning:%s ", "\033[0;93m", "\033[0m"),
 | 
						|
		log.Lmicroseconds|log.Lshortfile)
 | 
						|
}
 | 
						|
 | 
						|
const DOWNLOAD_WORKERS = 4
 | 
						|
 | 
						|
type DLHandler struct{}
 | 
						|
 | 
						|
func (*DLHandler) HandleMessage(message *nsq.Message) error {
 | 
						|
	log.Printf("Received message '%s' with %d attempts", message.Body, message.Attempts)
 | 
						|
	data := DownloadRequest{}
 | 
						|
	err := json.Unmarshal(message.Body, &data)
 | 
						|
	if err != nil {
 | 
						|
		Error.Printf("Error unmarshalling message: %v", err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	log.Printf("Downloading %s", data.Link)
 | 
						|
	err = Download(data.Link)
 | 
						|
	if err != nil {
 | 
						|
		Error.Printf("Error downloading %s: %v", data.Link, err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	message.Finish()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func main() {
 | 
						|
	config := nsq.NewConfig()
 | 
						|
	config.MaxAttempts = 5
 | 
						|
	config.MaxInFlight = DOWNLOAD_WORKERS
 | 
						|
	config.MsgTimeout = 10 * time.Second
 | 
						|
 | 
						|
	consumer, err := nsq.NewConsumer("ytdqueue", "dl", config)
 | 
						|
	if err != nil {
 | 
						|
		Error.Printf("Error creating consumer: %v", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	for i := 0; i < DOWNLOAD_WORKERS; i++ {
 | 
						|
		consumer.AddHandler(&DLHandler{})
 | 
						|
	}
 | 
						|
 | 
						|
	err = consumer.ConnectToNSQD("nsq.site.quack-lab.dev:41505")
 | 
						|
	if err != nil {
 | 
						|
		Error.Printf("Error connecting to nsqlookupd: %v", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	sigChan := make(chan os.Signal, 1)
 | 
						|
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
 | 
						|
 | 
						|
	<-sigChan
 | 
						|
	log.Println("Received signal to terminate. Initiating graceful shutdown...")
 | 
						|
 | 
						|
	consumer.Stop()
 | 
						|
	<-consumer.StopChan
 | 
						|
 | 
						|
	log.Println("Graceful shutdown completed.")
 | 
						|
}
 |