Simplify everything

This commit is contained in:
2024-07-14 14:59:12 +02:00
parent 4fd836c0b6
commit 468de9e401
4 changed files with 84 additions and 159 deletions

View File

@@ -11,7 +11,7 @@ import (
"sync" "sync"
) )
const URL = `https://youtube-download-ws-server.site.quack-lab.dev/download` const URL = `http://localhost:5000/download`
type Item struct { type Item struct {
Link string `json:"link"` Link string `json:"link"`

View File

@@ -3,53 +3,69 @@ package main
import ( import (
"context" "context"
"log" "log"
"sync"
"github.com/lrstanley/go-ytdlp"
"github.com/gen2brain/beeep" "github.com/gen2brain/beeep"
"github.com/lrstanley/go-ytdlp"
) )
const OUTPUT_DIR = "C:/Users/Administrator/ytdlpVideos" const OUTPUT_DIR = "C:/Users/Administrator/ytdlpVideos"
type DownloadWorker struct {
id int
input chan *DownloadTask
}
var ongoingDownloads = make(map[string]struct{})
var ongoingDownloadsMutex = &sync.Mutex{}
var dl = ytdlp.New(). var dl = ytdlp.New().
// FormatSort("bestvideo[ext=mp4]+bestaudio[ext=m4a]"). // FormatSort("bestvideo[ext=mp4]+bestaudio[ext=m4a]").
FormatSort("res,ext:mp4:m4a"). FormatSort("res,ext:mp4:m4a").
Output("C:/Users/Administrator/ytdlpVideos/%(uploader)s/%(title)s.%(ext)s"). Output("C:/Users/Administrator/ytdlpVideos/%(uploader)s/%(title)s.%(ext)s").
LimitRate("5M"). LimitRate("10M").
// HTTPChunkSize("20M"). // HTTPChunkSize("20M").
MarkWatched(). MarkWatched().
SponsorblockMark("all"). SponsorblockMark("all").
RecodeVideo("mp4"). RecodeVideo("mp4").
ConcurrentFragments(6) ConcurrentFragments(6)
// func Download(event PBEvent, status chan error) { func (w *DownloadWorker) Run() {
// _, err := dl.Run(context.TODO(), event.Record.Link) for {
// if err != nil { task, ok := <-w.input
// status <- err if !ok {
// return log.Printf("DownloadWorker %d: input channel closed, exiting", w.id)
// } return
}
_, ongoing := ongoingDownloads[task.Url]
if ongoing {
log.Printf("DownloadWorker %d: Download %s is already ongoing", w.id, task.Url)
continue
}
ongoingDownloadsMutex.Lock()
ongoingDownloads[task.Url] = struct{}{}
ongoingDownloadsMutex.Unlock()
// log.Printf("Downloaded %s (%s)", event.Record.Id, event.Record.Link) log.Printf("DownloadWorker %d: Downloading %s", w.id, task.Url)
// SetDownloaded(event)
// }
func DownloadURL(url string, status chan error) {
log.Printf("Downloading %s", url)
err := beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration) err := beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration)
if err != nil { if err != nil {
log.Printf("Failed beeping with %+v", err) log.Printf("Failed beeping with %+v", err)
} }
err = beeep.Alert("Download Started", url, "assets/information.png") err = beeep.Alert("Download Started", task.Url, "assets/information.png")
if err != nil { if err != nil {
log.Printf("Failed alerting with %+v", err) log.Printf("Failed alerting with %+v", err)
} }
_, err = dl.Run(context.TODO(), url) _, err = dl.Run(context.TODO(), task.Url)
if err != nil { if err != nil {
status <- err log.Printf("DownloadWorker %d: Failed downloading %s with %+v", w.id, task.Url, err)
return continue
} }
log.Printf("Downloaded %s", url) log.Printf("DownloadWorker %d: Downloaded %s", w.id, task.Url)
close(status) ongoingDownloadsMutex.Lock()
delete(ongoingDownloads, task.Url)
ongoingDownloadsMutex.Unlock()
}
} }

View File

@@ -1,123 +1,49 @@
package main package main
import ( import (
"encoding/json"
"io"
"log" "log"
"time" "net/http"
"os"
) )
const WEBSOCKET_SERVER = "ws://youtube-download-ws-server.site.quack-lab.dev/ws"
const WEBSOCKET_SERVER_ALT = "ws://localhost:8080/ws"
func init() { func init() {
// log.SetFlags(log.Lmicroseconds | log.Lshortfile) log.SetFlags(log.Lmicroseconds | log.Lshortfile)
log.SetFlags(log.Lmicroseconds) logFile, err := os.Create("main.log")
if err != nil {
log.Printf("Error creating log file: %v", err)
os.Exit(1)
}
logger := io.MultiWriter(os.Stdout, logFile)
log.SetOutput(logger)
} }
// func instrument() { const DOWNLOAD_WORKERS = 10
// numGoroutines := runtime.NumGoroutine()
// var m runtime.MemStats
// runtime.ReadMemStats(&m)
// // log.Printf("%+v", m)
// sys := float64(m.Sys)
// ramUsedMB := sys / 1024 / 1024
// kbPerGoroutine := sys / 1024 / float64(numGoroutines)
// var numGoroutinesPretty string
// switch {
// case numGoroutines >= 1_000_000:
// numGoroutinesPretty = fmt.Sprintf("%.2fM", float64(numGoroutines)/1_000_000)
// case numGoroutines >= 1_000:
// numGoroutinesPretty = fmt.Sprintf("%.2fk", float64(numGoroutines)/1_000)
// default:
// numGoroutinesPretty = fmt.Sprintf("%d", numGoroutines)
// }
// log.Printf("Number of active goroutines: %d (%s); RAM used: %.2f MB; KB per goroutine: %.2f", numGoroutines, numGoroutinesPretty, ramUsedMB, kbPerGoroutine)
// }
func main() { func main() {
// go func() { downloadQueue := make(chan *DownloadTask, 100)
// for { for i := 0; i < DOWNLOAD_WORKERS; i++ {
// instrument() worker := &DownloadWorker{id: i, input: downloadQueue}
// time.Sleep(1 * time.Second) go worker.Run()
// }
// }()
// res, err := http.Get(FULL_URL)
// if err != nil {
// log.Fatal(err)
// }
// defer res.Body.Close()
// body, err := io.ReadAll(res.Body)
// if err != nil {
// log.Printf("Error reading response body: %+v\n", err)
// return
// }
// if res.StatusCode != http.StatusOK {
// log.Printf("Non-OK HTTP status: %d\nResponse body: %s\n", res.StatusCode, body)
// return
// }
// var data APIResponse
// err = json.Unmarshal(body, &data)
// if err != nil {
// log.Printf("Error unmarshaling JSON: %+v\n", err)
// return
// }
// log.Printf("Data: %+v\n", data)
// listener := new(RealtimeListener)
// listener.Url = POCKETBASE_REALTIME
// listener.Collections = []string{COLLECTION_NAME}
// listener.initialize()
var ws WSConnection
read := make(chan string)
go func() {
for {
ws = WSConnection{
url: WEBSOCKET_SERVER_ALT,
} }
ws.Open()
for {
msg, ok := <-ws.ReadChan
if !ok {
break
}
read <- msg
}
<-ws.Dead
log.Printf("Reconnecting in 5 seconds...")
time.Sleep(5 * time.Second)
}
}()
sem := make(chan struct{}, 4) http.HandleFunc("/download", func(responseWriter http.ResponseWriter, request *http.Request) {
for { defer request.Body.Close()
select {
case event := <-read:
eventCopy := event
status := make(chan error)
sem <- struct{}{}
log.Printf("New event: %+v; semaphore at: %d", eventCopy, len(sem)) req := DownloadRequest{}
go func() { err := json.NewDecoder(request.Body).Decode(&req)
defer func() { if err != nil {
<-sem log.Printf("Error parsing JSON: %v", err)
log.Printf("Semaphore at: %d", len(sem)) http.Error(responseWriter, "Error parsing JSON", http.StatusBadRequest)
}() return
// Download(eventCopy, status)
DownloadURL(eventCopy, status)
// go DownloadNative(event, status)
for status := range status {
log.Printf("Status: %s\n", status)
}
}()
case <-time.After(1 * time.Minute):
// Perform some action or simply continue to avoid deadlock
log.Println("Consumer is alive, but has no new events.")
} }
downloadQueue <- &DownloadTask{Url: req.Link}
})
log.Println("Server starting on :5000")
err := http.ListenAndServe(":5000", nil)
if err != nil {
log.Println("Error starting server:", err)
} }
} }

View File

@@ -1,25 +1,8 @@
package main package main
type APIResponse struct { type DownloadTask struct {
Page int `json:"page"` Url string
PerPage int `json:"perPage"`
TotalItems int `json:"totalItems"`
TotalPages int `json:"totalPages"`
Items []APIItem `json:"items"`
} }
type DownloadRequest struct {
type APIItem struct {
CollectionId string `json:"collectionId"`
CollectionName string `json:"collectionName"`
Created string `json:"created"`
Downloaded bool `json:"downloaded"`
Id string `json:"id"`
Link string `json:"link"` Link string `json:"link"`
Updated string `json:"updated"`
}
type PBEvent struct {
ClientId string `json:"clientId"`
Action string `json:"action"`
Record APIItem `json:"record"`
} }