Integrate ws client into downloader

This commit is contained in:
2024-06-18 09:38:55 +02:00
parent 0e9ddcd7a4
commit b4d161e8e9
5 changed files with 95 additions and 9 deletions

View File

@@ -30,3 +30,14 @@ func Download(event PBEvent, status chan error) {
log.Printf("Downloaded %s (%s)", event.Record.Id, event.Record.Link)
SetDownloaded(event)
}
func DownloadURL(url string, status chan error) {
log.Printf("Downloading %s", url)
_, err := dl.Run(context.TODO(), url)
if err != nil {
status <- err
return
}
log.Printf("Downloaded %s", url)
}

View File

@@ -20,4 +20,7 @@ require (
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
)
require github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99
require (
github.com/gorilla/websocket v1.5.3
github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99
)

View File

@@ -20,6 +20,8 @@ github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=

View File

@@ -9,6 +9,8 @@ const POCKETBASE_URL = `https://pocketbase-scratch.site.quack-lab.dev/api/collec
const POCKETBASE_REALTIME = `https://pocketbase-scratch.site.quack-lab.dev/api/realtime`
const COLLECTION_NAME = "youtubedownload"
const FULL_URL = POCKETBASE_URL + "/" + COLLECTION_NAME + "/records"
const WEBSOCKET_SERVER = "ws://youtube-download-ws-server.site.quack-lab.dev/ws"
const WEBSOCKET_SERVER_ALT = "ws://localhost:8080/ws"
func main() {
log.SetFlags(log.Lmicroseconds)
@@ -37,19 +39,24 @@ func main() {
// }
// log.Printf("Data: %+v\n", data)
listener := new(RealtimeListener)
listener.Url = POCKETBASE_REALTIME
listener.Collections = []string{COLLECTION_NAME}
listener.initialize()
// listener := new(RealtimeListener)
// listener.Url = POCKETBASE_REALTIME
// listener.Collections = []string{COLLECTION_NAME}
// listener.initialize()
status := make(chan error)
ws := new(WSConnection)
ws.url = WEBSOCKET_SERVER
ws.Open()
status := make(chan error, 16)
for {
select {
case event := <-listener.Create:
log.Printf("Create event: %+v\n", event)
case event := <-ws.ReadChan:
eventCopy := event
log.Printf("New event: %+v", eventCopy)
go func() {
Download(eventCopy, status)
// Download(eventCopy, status)
DownloadURL(eventCopy, status)
// go DownloadNative(event, status)
for status := range status {
log.Printf("Status: %s\n", status)

63
downloader/ws-client.go Normal file
View File

@@ -0,0 +1,63 @@
package main
import (
"log"
"time"
"github.com/gorilla/websocket"
)
type WSConnection struct {
url string
conn *websocket.Conn
errChan chan error
ReadChan chan string
}
func (ws *WSConnection) readMessage() {
log.Printf("Reading messages")
for {
_, message, err := ws.conn.ReadMessage()
if err != nil {
ws.errChan <- err
return
}
log.Printf("Received: %s", message)
ws.ReadChan <- string(message)
}
}
func (ws *WSConnection) writeMessage(message string) {
err := ws.conn.WriteMessage(websocket.TextMessage, []byte(message))
if err != nil {
log.Printf("Error during message writing: %v", err)
ws.errChan <- err
return
}
}
func (ws *WSConnection) handleError() {
for {
err := <-ws.errChan
log.Println("Error during message reading:", err)
time.Sleep(5 * time.Second)
ws.Open()
}
}
func (ws *WSConnection) Open() {
log.Printf("Connecting to %s", ws.url)
conn, _, err := websocket.DefaultDialer.Dial(ws.url, nil)
if err != nil {
log.Println("Error during connection:", err)
ws.errChan <- err
return
}
log.Printf("Connected")
ws.conn = conn
ws.errChan = make(chan error)
ws.ReadChan = make(chan string, 1024)
go ws.readMessage()
go ws.handleError()
}