diff --git a/downloader/download.go b/downloader/download.go index a0147cf..c4c725c 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -30,3 +30,14 @@ func Download(event PBEvent, status chan error) { log.Printf("Downloaded %s (%s)", event.Record.Id, event.Record.Link) SetDownloaded(event) } + +func DownloadURL(url string, status chan error) { + log.Printf("Downloading %s", url) + _, err := dl.Run(context.TODO(), url) + if err != nil { + status <- err + return + } + + log.Printf("Downloaded %s", url) +} diff --git a/downloader/go.mod b/downloader/go.mod index fa0d15d..5157924 100644 --- a/downloader/go.mod +++ b/downloader/go.mod @@ -20,4 +20,7 @@ require ( gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect ) -require github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99 +require ( + github.com/gorilla/websocket v1.5.3 + github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99 +) diff --git a/downloader/go.sum b/downloader/go.sum index 445796d..1b4075d 100644 --- a/downloader/go.sum +++ b/downloader/go.sum @@ -20,6 +20,8 @@ github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8 github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= diff --git a/downloader/main.go b/downloader/main.go index 145fbd0..d51217e 100644 --- a/downloader/main.go +++ b/downloader/main.go @@ -9,6 +9,8 @@ const POCKETBASE_URL = `https://pocketbase-scratch.site.quack-lab.dev/api/collec const POCKETBASE_REALTIME = `https://pocketbase-scratch.site.quack-lab.dev/api/realtime` const COLLECTION_NAME = "youtubedownload" const FULL_URL = POCKETBASE_URL + "/" + COLLECTION_NAME + "/records" +const WEBSOCKET_SERVER = "ws://youtube-download-ws-server.site.quack-lab.dev/ws" +const WEBSOCKET_SERVER_ALT = "ws://localhost:8080/ws" func main() { log.SetFlags(log.Lmicroseconds) @@ -37,19 +39,24 @@ func main() { // } // log.Printf("Data: %+v\n", data) - listener := new(RealtimeListener) - listener.Url = POCKETBASE_REALTIME - listener.Collections = []string{COLLECTION_NAME} - listener.initialize() + // listener := new(RealtimeListener) + // listener.Url = POCKETBASE_REALTIME + // listener.Collections = []string{COLLECTION_NAME} + // listener.initialize() - status := make(chan error) + ws := new(WSConnection) + ws.url = WEBSOCKET_SERVER + ws.Open() + + status := make(chan error, 16) for { select { - case event := <-listener.Create: - log.Printf("Create event: %+v\n", event) + case event := <-ws.ReadChan: eventCopy := event + log.Printf("New event: %+v", eventCopy) go func() { - Download(eventCopy, status) + // Download(eventCopy, status) + DownloadURL(eventCopy, status) // go DownloadNative(event, status) for status := range status { log.Printf("Status: %s\n", status) diff --git a/downloader/ws-client.go b/downloader/ws-client.go new file mode 100644 index 0000000..ca9357f --- /dev/null +++ b/downloader/ws-client.go @@ -0,0 +1,63 @@ +package main + +import ( + "log" + "time" + + "github.com/gorilla/websocket" +) + +type WSConnection struct { + url string + conn *websocket.Conn + errChan chan error + ReadChan chan string +} + +func (ws *WSConnection) readMessage() { + log.Printf("Reading messages") + for { + _, message, err := ws.conn.ReadMessage() + if err != nil { + ws.errChan <- err + return + } + log.Printf("Received: %s", message) + ws.ReadChan <- string(message) + } +} + +func (ws *WSConnection) writeMessage(message string) { + err := ws.conn.WriteMessage(websocket.TextMessage, []byte(message)) + if err != nil { + log.Printf("Error during message writing: %v", err) + ws.errChan <- err + return + } +} + +func (ws *WSConnection) handleError() { + for { + err := <-ws.errChan + log.Println("Error during message reading:", err) + + time.Sleep(5 * time.Second) + ws.Open() + } +} + +func (ws *WSConnection) Open() { + log.Printf("Connecting to %s", ws.url) + conn, _, err := websocket.DefaultDialer.Dial(ws.url, nil) + if err != nil { + log.Println("Error during connection:", err) + ws.errChan <- err + return + } + log.Printf("Connected") + ws.conn = conn + ws.errChan = make(chan error) + ws.ReadChan = make(chan string, 1024) + go ws.readMessage() + go ws.handleError() +}