From 8d55bcb5eeb53cb623b6d86bdec08840dbb890c4 Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Sun, 13 Oct 2024 23:19:37 +0200 Subject: [PATCH] Refactor everything to use nsq instead of hosting a http server The point of this is (hopefully) some sort of resiliency I do not want to lose any messages ever And I want to be able to kill this process whenever it is misbehaving Hopefully this achieves that goal --- downloader/download.go | 72 ++++++++++++++++--------------------- downloader/go.mod | 2 ++ downloader/go.sum | 4 +++ downloader/main.go | 81 ++++++++++++++++++++++++------------------ downloader/types.go | 3 -- 5 files changed, 82 insertions(+), 80 deletions(-) diff --git a/downloader/download.go b/downloader/download.go index 053a60f..82e59d7 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -12,11 +12,6 @@ import ( const OUTPUT_DIR = "C:/Users/Administrator/ytdlpVideos" -type DownloadWorker struct { - id int - input chan *DownloadTask -} - var ongoingDownloads = make(map[string]struct{}) var ongoingDownloadsMutex = &sync.Mutex{} @@ -31,42 +26,35 @@ var dl = ytdlp.New(). RecodeVideo("mp4"). ConcurrentFragments(6) -func (w *DownloadWorker) Run() { - for { - task, ok := <-w.input - if !ok { - log.Printf("DownloadWorker %d: input channel closed, exiting", w.id) - return - } - _, ongoing := ongoingDownloads[task.Url] - if ongoing { - log.Printf("DownloadWorker %d: Download %s is already ongoing", w.id, task.Url) - continue - } - ongoingDownloadsMutex.Lock() - ongoingDownloads[task.Url] = struct{}{} - ongoingDownloadsMutex.Unlock() - - log.Printf("DownloadWorker %d: Downloading %s", w.id, task.Url) - - err := beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration) - if err != nil { - log.Printf("Failed beeping with %+v", err) - } - err = beeep.Alert("Download Started", task.Url, "assets/information.png") - if err != nil { - log.Printf("Failed alerting with %+v", err) - } - - _, err = dl.Run(context.TODO(), task.Url) - if err != nil { - log.Printf("DownloadWorker %d: Failed downloading %s with %+v", w.id, task.Url, err) - continue - } - - log.Printf("DownloadWorker %d: Downloaded %s", w.id, task.Url) - ongoingDownloadsMutex.Lock() - delete(ongoingDownloads, task.Url) - ongoingDownloadsMutex.Unlock() +func Download(url string) error { + _, ongoing := ongoingDownloads[url] + if ongoing { + // return fmt.Errorf("Download %s is already ongoing", url) + return nil } + ongoingDownloadsMutex.Lock() + ongoingDownloads[url] = struct{}{} + ongoingDownloadsMutex.Unlock() + + log.Printf("Downloading %s", url) + + err := beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration) + if err != nil { + Warning.Printf("Failed beeping with %+v", err) + } + err = beeep.Alert("Download Started", url, "assets/information.png") + if err != nil { + Warning.Printf("Failed alerting with %+v", err) + } + + _, err = dl.Run(context.TODO(), url) + if err != nil { + return fmt.Errorf("failed downloading %s with %+v", url, err) + } + + log.Printf("Downloaded %s", url) + ongoingDownloadsMutex.Lock() + delete(ongoingDownloads, url) + ongoingDownloadsMutex.Unlock() + return nil } diff --git a/downloader/go.mod b/downloader/go.mod index 143c4c1..33e21e6 100644 --- a/downloader/go.mod +++ b/downloader/go.mod @@ -7,6 +7,7 @@ require ( github.com/cloudflare/circl v1.3.7 // indirect github.com/go-toast/toast v0.0.0-20190211030409-01e6764cf0a4 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/golang/snappy v0.0.1 // indirect github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d // indirect github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af // indirect golang.org/x/crypto v0.21.0 // indirect @@ -17,4 +18,5 @@ require ( github.com/gen2brain/beeep v0.0.0-20240516210008-9c006672e7f4 github.com/gorilla/websocket v1.5.3 github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99 + github.com/nsqio/go-nsq v1.1.0 ) diff --git a/downloader/go.sum b/downloader/go.sum index 76dc1dc..f599d4f 100644 --- a/downloader/go.sum +++ b/downloader/go.sum @@ -10,10 +10,14 @@ github.com/go-toast/toast v0.0.0-20190211030409-01e6764cf0a4 h1:qZNfIGkIANxGv/Oq github.com/go-toast/toast v0.0.0-20190211030409-01e6764cf0a4/go.mod h1:kW3HQ4UdaAyrUCSSDR4xUzBKW6O2iA4uHhk7AtyYp10= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99 h1:ZAo7qJht9PqefOD7C0ZKQ8dEkpJeM955sYw0FtQnzvo= github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99/go.mod h1:75ujbafjqiJugIGw4K6o52/p8C0m/kt+DrYwgClXYT4= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d h1:VhgPp6v9qf9Agr/56bj7Y/xa04UccTW04VP0Qed4vnQ= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af h1:6yITBqGTE2lEeTPG04SN9W+iWHCRyHqlVYILiSXziwk= diff --git a/downloader/main.go b/downloader/main.go index 1de4b09..8299802 100644 --- a/downloader/main.go +++ b/downloader/main.go @@ -5,12 +5,17 @@ import ( "fmt" "io" "log" - "net/http" "os" + "os/signal" + "syscall" + "time" + + "github.com/nsqio/go-nsq" ) var Error *log.Logger var Warning *log.Logger + func init() { log.SetFlags(log.Lmicroseconds | log.Lshortfile) logFile, err := os.Create("main.log") @@ -30,50 +35,56 @@ func init() { } const DOWNLOAD_WORKERS = 4 -var downloadQueue = make(chan *DownloadTask, 100) -func enableCORS(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type") +type DLHandler struct{} - if r.Method == http.MethodOptions { - w.WriteHeader(http.StatusOK) - return - } - next.ServeHTTP(w, r) - }) -} - -func handleDownload(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) - return +func (*DLHandler) HandleMessage(message *nsq.Message) error { + log.Printf("Received message '%s' with %d attempts", message.Body, message.Attempts) + data := DownloadRequest{} + err := json.Unmarshal(message.Body, &data) + if err != nil { + Error.Printf("Error unmarshalling message: %v", err) + return err } - - var req DownloadRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - log.Printf("Error parsing JSON: %v", err) - http.Error(w, "Error parsing JSON", http.StatusBadRequest) - return + log.Printf("Downloading %s", data.Link) + err = Download(data.Link) + if err != nil { + Error.Printf("Error downloading %s: %v", data.Link, err) + return err } - - downloadQueue <- &DownloadTask{Url: req.Link} - w.WriteHeader(http.StatusOK) + message.Finish() + return nil } func main() { + config := nsq.NewConfig() + config.MaxAttempts = 5 + config.MaxInFlight = DOWNLOAD_WORKERS + config.MsgTimeout = 10 * time.Second + + consumer, err := nsq.NewConsumer("ytdqueue", "dl", config) + if err != nil { + Error.Printf("Error creating consumer: %v", err) + return + } for i := 0; i < DOWNLOAD_WORKERS; i++ { - worker := &DownloadWorker{id: i, input: downloadQueue} - go worker.Run() + consumer.AddHandler(&DLHandler{}) } - mux := http.NewServeMux() - mux.Handle("/download", enableCORS(http.HandlerFunc(handleDownload))) - log.Println("Server starting on :5000") - err := http.ListenAndServe(":5000", mux) + err = consumer.ConnectToNSQD("nsq.site.quack-lab.dev:41505") if err != nil { - log.Println("Error starting server:", err) + Error.Printf("Error connecting to nsqlookupd: %v", err) + return } + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + <-sigChan + log.Println("Received signal to terminate. Initiating graceful shutdown...") + + consumer.Stop() + <-consumer.StopChan + + log.Println("Graceful shutdown completed.") } diff --git a/downloader/types.go b/downloader/types.go index a51b886..6a91bf4 100644 --- a/downloader/types.go +++ b/downloader/types.go @@ -1,8 +1,5 @@ package main -type DownloadTask struct { - Url string -} type DownloadRequest struct { Link string `json:"link"` } \ No newline at end of file