package main import ( "context" "encoding/json" "flag" "fmt" "os" "os/signal" "syscall" "time" "ytdl/downloaders" logger "git.site.quack-lab.dev/dave/cylogger" "github.com/nsqio/go-nsq" ) // var downloader downloaders.Downloader = &downloaders.YTDLPRawDownloader{} // var downloader downloaders.Downloader = &downloaders.KidaiDownloader{} var downloader downloaders.Downloader = &downloaders.YTDLPLibDownloader{} type DLHandler struct{} func (*DLHandler) HandleMessage(message *nsq.Message) error { messagelog := logger.Default.WithPrefix(fmt.Sprintf("message=%q", message.Body)).WithPrefix(fmt.Sprintf("attempts=%d", message.Attempts)) messagelog.Info("Received message") data := DownloadRequest{} err := json.Unmarshal(message.Body, &data) if err != nil { messagelog.Error("Error unmarshalling message: %v", err) return err } ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { messagelog.Debug("Starting touch ticker") ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: messagelog.Debug("Touching message") message.Touch() case <-ctx.Done(): return } } }() messagelog.Debug("Downloading %q", data.Link) err = downloader.Download(data.Link) if err != nil { messagelog.Error("Error downloading %s: %v", data.Link, err) return err } messagelog.Info("Downloaded %q", data.Link) message.Finish() return nil } func main() { flag.Parse() logger.InitFlag() // err := downloader.Download("https://www.youtube.com/watch?v=SiKjprtiPaw") // if err != nil { // Error.Printf("Error downloading: %v", err) // } // return logger.Info("Starting downloader") config := nsq.NewConfig() config.MaxAttempts = 5 config.MaxInFlight = downloaders.DOWNLOAD_WORKERS config.MsgTimeout = 10 * time.Second logger.Info("Creating consumer") consumer, err := nsq.NewConsumer("ytdqueue", "dl", config) if err != nil { logger.Error("Error creating consumer: %v", err) return } logger.Info("Creating handlers") for i := 0; i < downloaders.DOWNLOAD_WORKERS; i++ { consumer.AddHandler(&DLHandler{}) } url := "192.168.1.123:41505" logger.Info("Connecting to nsq at %s", url) err = consumer.ConnectToNSQD(url) if err != nil { logger.Error("Error connecting to nsq: %v", err) return } logger.Info("Connected to nsq at %s", url) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) logger.Info("Waiting for signal to terminate") <-sigChan logger.Info("Received signal to terminate. Initiating graceful shutdown...") consumer.Stop() <-consumer.StopChan logger.Info("Graceful shutdown completed.") }