diff --git a/dl/dl.go b/dl/dl.go index 14e61eb..57ab548 100644 --- a/dl/dl.go +++ b/dl/dl.go @@ -11,7 +11,7 @@ import ( "sync" ) -const URL = `https://youtube-download-ws-server.site.quack-lab.dev/download` +const URL = `http://localhost:5000/download` type Item struct { Link string `json:"link"` diff --git a/downloader/download.go b/downloader/download.go index 0825132..105f37d 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -3,53 +3,69 @@ package main import ( "context" "log" + "sync" - "github.com/lrstanley/go-ytdlp" "github.com/gen2brain/beeep" + "github.com/lrstanley/go-ytdlp" ) const OUTPUT_DIR = "C:/Users/Administrator/ytdlpVideos" +type DownloadWorker struct { + id int + input chan *DownloadTask +} + +var ongoingDownloads = make(map[string]struct{}) +var ongoingDownloadsMutex = &sync.Mutex{} + var dl = ytdlp.New(). // FormatSort("bestvideo[ext=mp4]+bestaudio[ext=m4a]"). FormatSort("res,ext:mp4:m4a"). Output("C:/Users/Administrator/ytdlpVideos/%(uploader)s/%(title)s.%(ext)s"). - LimitRate("5M"). + LimitRate("10M"). // HTTPChunkSize("20M"). MarkWatched(). SponsorblockMark("all"). RecodeVideo("mp4"). ConcurrentFragments(6) -// func Download(event PBEvent, status chan error) { -// _, err := dl.Run(context.TODO(), event.Record.Link) -// if err != nil { -// status <- err -// return -// } +func (w *DownloadWorker) Run() { + for { + task, ok := <-w.input + if !ok { + log.Printf("DownloadWorker %d: input channel closed, exiting", w.id) + return + } + _, ongoing := ongoingDownloads[task.Url] + if ongoing { + log.Printf("DownloadWorker %d: Download %s is already ongoing", w.id, task.Url) + continue + } + ongoingDownloadsMutex.Lock() + ongoingDownloads[task.Url] = struct{}{} + ongoingDownloadsMutex.Unlock() -// log.Printf("Downloaded %s (%s)", event.Record.Id, event.Record.Link) -// SetDownloaded(event) -// } + log.Printf("DownloadWorker %d: Downloading %s", w.id, task.Url) -func DownloadURL(url string, status chan error) { - log.Printf("Downloading %s", url) - - err := beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration) - if err != nil { - log.Printf("Failed beeping with %+v", err) + err := beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration) + if err != nil { + log.Printf("Failed beeping with %+v", err) + } + err = beeep.Alert("Download Started", task.Url, "assets/information.png") + if err != nil { + log.Printf("Failed alerting with %+v", err) + } + + _, err = dl.Run(context.TODO(), task.Url) + if err != nil { + log.Printf("DownloadWorker %d: Failed downloading %s with %+v", w.id, task.Url, err) + continue + } + + log.Printf("DownloadWorker %d: Downloaded %s", w.id, task.Url) + ongoingDownloadsMutex.Lock() + delete(ongoingDownloads, task.Url) + ongoingDownloadsMutex.Unlock() } - err = beeep.Alert("Download Started", url, "assets/information.png") - if err != nil { - log.Printf("Failed alerting with %+v", err) - } - - _, err = dl.Run(context.TODO(), url) - if err != nil { - status <- err - return - } - - log.Printf("Downloaded %s", url) - close(status) } diff --git a/downloader/main.go b/downloader/main.go index a971674..cd08f73 100644 --- a/downloader/main.go +++ b/downloader/main.go @@ -1,123 +1,49 @@ package main import ( + "encoding/json" + "io" "log" - "time" + "net/http" + "os" ) -const WEBSOCKET_SERVER = "ws://youtube-download-ws-server.site.quack-lab.dev/ws" -const WEBSOCKET_SERVER_ALT = "ws://localhost:8080/ws" - func init() { - // log.SetFlags(log.Lmicroseconds | log.Lshortfile) - log.SetFlags(log.Lmicroseconds) + log.SetFlags(log.Lmicroseconds | log.Lshortfile) + logFile, err := os.Create("main.log") + if err != nil { + log.Printf("Error creating log file: %v", err) + os.Exit(1) + } + logger := io.MultiWriter(os.Stdout, logFile) + log.SetOutput(logger) } -// func instrument() { -// numGoroutines := runtime.NumGoroutine() - -// var m runtime.MemStats -// runtime.ReadMemStats(&m) -// // log.Printf("%+v", m) - -// sys := float64(m.Sys) -// ramUsedMB := sys / 1024 / 1024 -// kbPerGoroutine := sys / 1024 / float64(numGoroutines) - -// var numGoroutinesPretty string -// switch { -// case numGoroutines >= 1_000_000: -// numGoroutinesPretty = fmt.Sprintf("%.2fM", float64(numGoroutines)/1_000_000) -// case numGoroutines >= 1_000: -// numGoroutinesPretty = fmt.Sprintf("%.2fk", float64(numGoroutines)/1_000) -// default: -// numGoroutinesPretty = fmt.Sprintf("%d", numGoroutines) -// } - -// log.Printf("Number of active goroutines: %d (%s); RAM used: %.2f MB; KB per goroutine: %.2f", numGoroutines, numGoroutinesPretty, ramUsedMB, kbPerGoroutine) -// } +const DOWNLOAD_WORKERS = 10 func main() { - // go func() { - // for { - // instrument() - // time.Sleep(1 * time.Second) - // } - // }() + downloadQueue := make(chan *DownloadTask, 100) + for i := 0; i < DOWNLOAD_WORKERS; i++ { + worker := &DownloadWorker{id: i, input: downloadQueue} + go worker.Run() + } - // res, err := http.Get(FULL_URL) - // if err != nil { - // log.Fatal(err) - // } - // defer res.Body.Close() - // body, err := io.ReadAll(res.Body) - // if err != nil { - // log.Printf("Error reading response body: %+v\n", err) - // return - // } - // if res.StatusCode != http.StatusOK { - // log.Printf("Non-OK HTTP status: %d\nResponse body: %s\n", res.StatusCode, body) - // return - // } + http.HandleFunc("/download", func(responseWriter http.ResponseWriter, request *http.Request) { + defer request.Body.Close() - // var data APIResponse - // err = json.Unmarshal(body, &data) - // if err != nil { - // log.Printf("Error unmarshaling JSON: %+v\n", err) - // return - // } - // log.Printf("Data: %+v\n", data) - - // listener := new(RealtimeListener) - // listener.Url = POCKETBASE_REALTIME - // listener.Collections = []string{COLLECTION_NAME} - // listener.initialize() - - var ws WSConnection - read := make(chan string) - go func() { - for { - ws = WSConnection{ - url: WEBSOCKET_SERVER_ALT, - } - ws.Open() - for { - msg, ok := <-ws.ReadChan - if !ok { - break - } - read <- msg - } - <-ws.Dead - log.Printf("Reconnecting in 5 seconds...") - time.Sleep(5 * time.Second) + req := DownloadRequest{} + err := json.NewDecoder(request.Body).Decode(&req) + if err != nil { + log.Printf("Error parsing JSON: %v", err) + http.Error(responseWriter, "Error parsing JSON", http.StatusBadRequest) + return } - }() - sem := make(chan struct{}, 4) - for { - select { - case event := <-read: - eventCopy := event - status := make(chan error) - sem <- struct{}{} - - log.Printf("New event: %+v; semaphore at: %d", eventCopy, len(sem)) - go func() { - defer func() { - <-sem - log.Printf("Semaphore at: %d", len(sem)) - }() - // Download(eventCopy, status) - DownloadURL(eventCopy, status) - // go DownloadNative(event, status) - for status := range status { - log.Printf("Status: %s\n", status) - } - }() - case <-time.After(1 * time.Minute): - // Perform some action or simply continue to avoid deadlock - log.Println("Consumer is alive, but has no new events.") - } + downloadQueue <- &DownloadTask{Url: req.Link} + }) + log.Println("Server starting on :5000") + err := http.ListenAndServe(":5000", nil) + if err != nil { + log.Println("Error starting server:", err) } } diff --git a/downloader/types.go b/downloader/types.go index 884eab2..a51b886 100644 --- a/downloader/types.go +++ b/downloader/types.go @@ -1,25 +1,8 @@ package main -type APIResponse struct { - Page int `json:"page"` - PerPage int `json:"perPage"` - TotalItems int `json:"totalItems"` - TotalPages int `json:"totalPages"` - Items []APIItem `json:"items"` -} - -type APIItem struct { - CollectionId string `json:"collectionId"` - CollectionName string `json:"collectionName"` - Created string `json:"created"` - Downloaded bool `json:"downloaded"` - Id string `json:"id"` - Link string `json:"link"` - Updated string `json:"updated"` -} - -type PBEvent struct { - ClientId string `json:"clientId"` - Action string `json:"action"` - Record APIItem `json:"record"` +type DownloadTask struct { + Url string } +type DownloadRequest struct { + Link string `json:"link"` +} \ No newline at end of file