Compare commits

..

23 Commits

Author SHA1 Message Date
089e7f637f Limit download speed to not kill internet 2024-06-21 22:23:02 +02:00
be38b3d602 Fix error handling 2024-06-20 17:51:48 +02:00
b6e7040662 Fix broadcast 2024-06-20 17:46:43 +02:00
0174f3d9b9 Fix issue with download order not being propagated
I assume it's because dead sockets are consuming broadcast messages
2024-06-19 21:40:33 +02:00
7cac378d04 add downloader build 2024-06-18 20:14:26 +02:00
85a0dcd8f5 Update tampermonkey script 2024-06-18 18:45:18 +02:00
be1f62d27d Fix issue with server not broadcasting 2024-06-18 18:24:10 +02:00
d94b581d41 Remove unused api and realtime 2024-06-18 18:24:10 +02:00
e0635c3bc9 Remove deploy 2024-06-18 18:16:16 +02:00
eb6764538b Rework DL to use new endpoint 2024-06-18 18:10:19 +02:00
d5a70a2700 Rework websockets and add timeouts 2024-06-18 18:10:19 +02:00
b4d161e8e9 Integrate ws client into downloader 2024-06-18 09:38:55 +02:00
0e9ddcd7a4 Add server build files 2024-06-18 09:38:48 +02:00
eccd113e90 Assemble useful client-server communication 2024-06-18 01:03:11 +02:00
762e62614d Implement basic ws server and client 2024-06-18 00:47:10 +02:00
fa6be7f2cf Add ws server 2024-06-18 00:10:00 +02:00
f01e13a82d Refactor 2024-06-18 00:09:58 +02:00
2ec00ea02d Cleanup 2024-06-18 00:08:05 +02:00
bb7af20f83 Add dl to enqueue downloads 2024-06-17 21:19:55 +02:00
6e5dc31856 Add dl module 2024-06-17 21:11:23 +02:00
eb43976857 Fix issue with fake deadlock 2024-06-17 21:11:19 +02:00
6f2b472c83 Add tampermonkey script 2024-06-17 16:35:19 +02:00
3663c6c214 Fix issues with slow 2024-06-17 16:19:12 +02:00
29 changed files with 848 additions and 259 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.idea
main.exe
logs.log
ws-server/deploy.tar

75
api.go
View File

@@ -1,75 +0,0 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
)
type APIError struct {
Code int `json:"code"`
Message string `json:"message"`
Data APIErrorData `json:"data"`
}
type APIErrorData struct {
Link APIErrorLink `json:"link"`
}
type APIErrorLink struct {
Code string `json:"code"`
Message string `json:"message"`
}
func SetDownloaded(item PBEvent) (err error) {
req, err := http.NewRequestWithContext(context.Background(), "PATCH", FULL_URL+"/"+item.Record.Id, nil)
if err != nil {
log.Printf("Error creating PATCH request: %++v", err)
return err
}
req.Header.Set("Content-Type", "application/json")
partialItem := new(PBEvent)
partialItem.Record = item.Record
partialItem.Record.Downloaded = true
body, err := json.Marshal(partialItem.Record)
if err != nil {
log.Printf("Error marshalling subscription body: %++v", err)
return err
}
req.Body = io.NopCloser(bytes.NewReader(body))
client := http.Client{}
res, err := client.Do(req)
if err != nil {
log.Printf("Error sending PATCH request: %++v", err)
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Printf("Non-OK HTTP status: %d", res.StatusCode)
body, err = io.ReadAll(res.Body)
if err != nil {
log.Printf("Error reading response body: %++v", err)
return err
}
var data APIError
err = json.Unmarshal(body, &data)
if err != nil {
log.Printf("Error unmarshaling JSON: %++v", err)
return err
}
log.Printf("API error: %++v", data)
return fmt.Errorf("Non-OK HTTP status, err: %++v", data)
}
return nil
}

View File

@@ -1 +0,0 @@
docker build -t youtube-downloader .

73
dl/dl.go Normal file
View File

@@ -0,0 +1,73 @@
package main
import (
"bytes"
"context"
"encoding/json"
"io"
"log"
"net/http"
"os"
"sync"
)
const URL = `https://youtube-download-ws-server.site.quack-lab.dev/download`
type Item struct {
Link string `json:"link"`
}
var wg sync.WaitGroup
func main() {
log.SetFlags(log.Lmicroseconds)
for _, url := range os.Args[1:] {
log.Printf("Downloading %s", url)
wg.Add(1)
go Download(url)
}
wg.Wait()
}
func Download(url string) {
defer wg.Done()
req, err := http.NewRequestWithContext(context.Background(), "POST", URL, nil)
if err != nil {
log.Printf("Error creating POST request: %++v", err)
return
}
req.Header.Set("Content-Type", "application/json")
item := new(Item)
item.Link = url
body, err := json.Marshal(item)
if err != nil {
log.Printf("Error marshalling subscription body: %++v", err)
return
}
req.Body = io.NopCloser(bytes.NewReader(body))
client := http.Client{}
res, err := client.Do(req)
if err != nil {
log.Printf("Error sending POST request: %++v", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Printf("Non-OK HTTP status: %d", res.StatusCode)
body, err = io.ReadAll(res.Body)
if err != nil {
log.Printf("Error reading response body: %++v", err)
return
}
return
} else {
log.Printf("Enqueued %s", url)
}
}

3
dl/go.mod Normal file
View File

@@ -0,0 +1,3 @@
module main
go 1.22.4

1
dl/sync Normal file
View File

@@ -0,0 +1 @@
main.exe,"C:\Program Files\Git\usr\bin\dl.exe",t

View File

@@ -1,33 +0,0 @@
package main
import (
"context"
"log"
"github.com/lrstanley/go-ytdlp"
)
const OUTPUT_DIR = "C:/Users/Administrator/ytdlpVideos"
var dl = ytdlp.New().
// FormatSort("bestvideo[ext=mp4]+bestaudio[ext=m4a]").
FormatSort("res,ext:mp4:m4a").
Output("C:/Users/Administrator/ytdlpVideos/%(uploader)s/%(title)s.%(ext)s").
LimitRate("50M").
// HTTPChunkSize("20M").
MarkWatched().
SponsorblockMark("all").
PrintJSON().
RecodeVideo("mp4").
ConcurrentFragments(4)
func Download(event PBEvent, status chan error) {
_, err := dl.Run(context.TODO(), event.Record.Link)
if err != nil {
status <- err
return
}
log.Printf("Downloaded %s (%s)", event.Record.Id, event.Record.Link)
SetDownloaded(event)
}

75
downloader/api.go Normal file
View File

@@ -0,0 +1,75 @@
package main
// import (
// "bytes"
// "context"
// "encoding/json"
// "fmt"
// "io"
// "log"
// "net/http"
// )
// type APIError struct {
// Code int `json:"code"`
// Message string `json:"message"`
// Data APIErrorData `json:"data"`
// }
// type APIErrorData struct {
// Link APIErrorLink `json:"link"`
// }
// type APIErrorLink struct {
// Code string `json:"code"`
// Message string `json:"message"`
// }
// func SetDownloaded(item PBEvent) (err error) {
// req, err := http.NewRequestWithContext(context.Background(), "PATCH", FULL_URL+"/"+item.Record.Id, nil)
// if err != nil {
// log.Printf("Error creating PATCH request: %++v", err)
// return err
// }
// req.Header.Set("Content-Type", "application/json")
// partialItem := new(PBEvent)
// partialItem.Record = item.Record
// partialItem.Record.Downloaded = true
// body, err := json.Marshal(partialItem.Record)
// if err != nil {
// log.Printf("Error marshalling subscription body: %++v", err)
// return err
// }
// req.Body = io.NopCloser(bytes.NewReader(body))
// client := http.Client{}
// res, err := client.Do(req)
// if err != nil {
// log.Printf("Error sending PATCH request: %++v", err)
// return err
// }
// defer res.Body.Close()
// if res.StatusCode != http.StatusOK {
// log.Printf("Non-OK HTTP status: %d", res.StatusCode)
// body, err = io.ReadAll(res.Body)
// if err != nil {
// log.Printf("Error reading response body: %++v", err)
// return err
// }
// var data APIError
// err = json.Unmarshal(body, &data)
// if err != nil {
// log.Printf("Error unmarshaling JSON: %++v", err)
// return err
// }
// log.Printf("API error: %++v", data)
// return fmt.Errorf("Non-OK HTTP status, err: %++v", data)
// }
// return nil
// }

3
downloader/build.sh Normal file
View File

@@ -0,0 +1,3 @@
nssm stop YoutubeDownloader && \
go build main && \
nssm start YoutubeDownloader

44
downloader/download.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import (
"context"
"log"
"github.com/lrstanley/go-ytdlp"
)
const OUTPUT_DIR = "C:/Users/Administrator/ytdlpVideos"
var dl = ytdlp.New().
// FormatSort("bestvideo[ext=mp4]+bestaudio[ext=m4a]").
FormatSort("res,ext:mp4:m4a").
Output("C:/Users/Administrator/ytdlpVideos/%(uploader)s/%(title)s.%(ext)s").
LimitRate("5M").
// HTTPChunkSize("20M").
MarkWatched().
SponsorblockMark("all").
RecodeVideo("mp4").
ConcurrentFragments(6)
// func Download(event PBEvent, status chan error) {
// _, err := dl.Run(context.TODO(), event.Record.Link)
// if err != nil {
// status <- err
// return
// }
// log.Printf("Downloaded %s (%s)", event.Record.Id, event.Record.Link)
// SetDownloaded(event)
// }
func DownloadURL(url string, status chan error) {
log.Printf("Downloading %s", url)
_, err := dl.Run(context.TODO(), url)
if err != nil {
status <- err
return
}
log.Printf("Downloaded %s", url)
close(status)
}

View File

@@ -20,4 +20,7 @@ require (
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
)
require github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99
require (
github.com/gorilla/websocket v1.5.3
github.com/lrstanley/go-ytdlp v0.0.0-20240616011628-f35a10876c99
)

View File

@@ -20,6 +20,8 @@ github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=

72
downloader/main.go Normal file
View File

@@ -0,0 +1,72 @@
package main
import (
"log"
"time"
)
const WEBSOCKET_SERVER = "ws://youtube-download-ws-server.site.quack-lab.dev/ws"
const WEBSOCKET_SERVER_ALT = "ws://localhost:8080/ws"
func main() {
log.SetFlags(log.Lmicroseconds)
// res, err := http.Get(FULL_URL)
// if err != nil {
// log.Fatal(err)
// }
// defer res.Body.Close()
// body, err := io.ReadAll(res.Body)
// if err != nil {
// log.Printf("Error reading response body: %+v\n", err)
// return
// }
// if res.StatusCode != http.StatusOK {
// log.Printf("Non-OK HTTP status: %d\nResponse body: %s\n", res.StatusCode, body)
// return
// }
// var data APIResponse
// err = json.Unmarshal(body, &data)
// if err != nil {
// log.Printf("Error unmarshaling JSON: %+v\n", err)
// return
// }
// log.Printf("Data: %+v\n", data)
// listener := new(RealtimeListener)
// listener.Url = POCKETBASE_REALTIME
// listener.Collections = []string{COLLECTION_NAME}
// listener.initialize()
ws := new(WSConnection)
ws.url = WEBSOCKET_SERVER
ws.Open()
sem := make(chan struct{}, 4)
for {
select {
case event := <-ws.ReadChan:
eventCopy := event
status := make(chan error)
sem <- struct{}{}
log.Printf("New event: %+v; semaphore at: %d", eventCopy, len(sem))
go func() {
defer func() {
<-sem
log.Printf("Semaphore at: %d", len(sem))
}()
// Download(eventCopy, status)
DownloadURL(eventCopy, status)
// go DownloadNative(event, status)
for status := range status {
log.Printf("Status: %s\n", status)
}
}()
case <-time.After(1 * time.Minute):
// Perform some action or simply continue to avoid deadlock
log.Println("Consumer is alive, but has no new events.")
}
}
}

89
downloader/realtime.go Normal file
View File

@@ -0,0 +1,89 @@
package main
// import (
// "bytes"
// "encoding/json"
// "log"
// "net/http"
// "github.com/r3labs/sse"
// )
// type RealtimeListener struct {
// Url string
// Collections []string
// Create chan PBEvent
// Update chan PBEvent
// Delete chan PBEvent
// client *sse.Client
// }
// type Subscription struct {
// ClientId string `json:"clientId"`
// Subscriptions []string `json:"subscriptions"`
// }
// func (listener RealtimeListener) handlePbEvent(msg *sse.Event) {
// pbEvent := new(PBEvent)
// err := json.Unmarshal(msg.Data, &pbEvent)
// if err != nil {
// log.Printf("Error unmarshalling event: %v\n", err)
// return
// }
// log.Printf("Received event: %++v", pbEvent)
// if pbEvent.ClientId != "" {
// listener.doSubscribe(pbEvent.ClientId)
// }
// if pbEvent.Action != "" {
// go listener.shipEvent(*pbEvent)
// }
// }
// func (listener RealtimeListener) shipEvent(event PBEvent) {
// switch event.Action {
// case "create":
// listener.Create <- event
// case "update":
// listener.Update <- event
// case "delete":
// listener.Delete <- event
// default:
// log.Printf("Unknown action: %v\n", event.Action)
// }
// }
// func (listener RealtimeListener) doSubscribe(clientId string) {
// subscription := Subscription{
// ClientId: clientId,
// Subscriptions: listener.Collections,
// }
// log.Printf("Subscribing client: %v to %++v", clientId, subscription)
// body, err := json.Marshal(subscription)
// if err != nil {
// log.Printf("Error marshalling subscription body: %v\n", err)
// return
// }
// resp, err := http.Post(POCKETBASE_REALTIME, "application/json", bytes.NewBuffer(body))
// if err != nil {
// log.Printf("Error posting subscription: %v\n", err)
// return
// }
// defer resp.Body.Close()
// if resp.StatusCode != http.StatusNoContent {
// log.Printf("Subscription request failed with status: %v\n", resp.Status)
// }
// }
// func (listener *RealtimeListener) initialize() {
// listener.Update = make(chan PBEvent, 32)
// listener.Create = make(chan PBEvent, 32)
// listener.Delete = make(chan PBEvent, 32)
// log.Print("Initialized")
// listener.client = sse.NewClient(listener.Url)
// go listener.client.Subscribe("", listener.handlePbEvent)
// }

111
downloader/ws-client.go Normal file
View File

@@ -0,0 +1,111 @@
package main
import (
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
const TIMEOUT = 6
const IDLE_TIMEOUT = TIMEOUT * time.Second
const PING_INTERVAL = (TIMEOUT / 2) * time.Second
type WSConnection struct {
url string
conn *websocket.Conn
errChan chan error
writeLock sync.Mutex
ReadChan chan string
WriteChan chan string
}
func (ws *WSConnection) messageReader() {
log.Printf("Reading messages")
for {
_, message, err := ws.conn.ReadMessage()
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
if err != nil {
ws.errChan <- err
return
}
log.Printf("Received: %s, %d in output channel", message, len(ws.ReadChan))
ws.ReadChan <- string(message)
}
}
func (ws *WSConnection) messageSender() {
log.Printf("Sending messages")
for {
msg := <-ws.WriteChan
ws.writeLock.Lock()
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan))
err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
log.Printf("Error during message writing: %v", err)
ws.errChan <- err
return
}
ws.writeLock.Unlock()
}
}
func (ws *WSConnection) pinger() {
log.Printf("Starting pinger, sleeping for %v", PING_INTERVAL)
for {
time.Sleep(PING_INTERVAL)
log.Printf("Ping")
ws.writeLock.Lock()
err := ws.conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
log.Println("Error during ping:", err)
ws.errChan <- err
return
}
ws.writeLock.Unlock()
}
}
func (ws *WSConnection) handleError() {
for {
err := <-ws.errChan
log.Println("Error during message reading:", err)
time.Sleep(5 * time.Second)
ws.Open()
}
}
func (ws *WSConnection) Open() {
log.Printf("Connecting to %s", ws.url)
conn, _, err := websocket.DefaultDialer.Dial(ws.url, nil)
if err != nil {
log.Println("Error during connection:", err)
ws.errChan <- err
return
}
log.Printf("Connected")
ws.conn = conn
ws.errChan = make(chan error)
ws.ReadChan = make(chan string, 1024)
ws.WriteChan = make(chan string, 1024)
ws.conn.SetReadLimit(1024)
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetPongHandler(func(string) error {
log.Println("Pong")
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
return nil
})
go ws.messageReader()
go ws.messageSender()
go ws.handleError()
go ws.pinger()
}

62
main.go
View File

@@ -1,62 +0,0 @@
package main
import (
"encoding/json"
"io"
"log"
"net/http"
"time"
)
const POCKETBASE_URL = `https://pocketbase-scratch.site.quack-lab.dev/api/collections`
const POCKETBASE_REALTIME = `https://pocketbase-scratch.site.quack-lab.dev/api/realtime`
const COLLECTION_NAME = "youtubedownload"
const FULL_URL = POCKETBASE_URL + "/" + COLLECTION_NAME + "/records"
func main() {
log.SetFlags(log.Lmicroseconds)
log.Println(FULL_URL)
res, err := http.Get(FULL_URL)
if err != nil {
log.Fatal(err)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
log.Printf("Error reading response body: %+v\n", err)
return
}
if res.StatusCode != http.StatusOK {
log.Printf("Non-OK HTTP status: %d\nResponse body: %s\n", res.StatusCode, body)
return
}
var data APIResponse
err = json.Unmarshal(body, &data)
if err != nil {
log.Printf("Error unmarshaling JSON: %+v\n", err)
return
}
// log.Printf("Data: %+v\n", data)
listener := new(RealtimeListener)
listener.Url = POCKETBASE_REALTIME
listener.Collections = []string{COLLECTION_NAME}
listener.initialize()
status := make(chan error)
for event := range listener.Create {
log.Printf("Create event: %+v\n", event)
eventCopy := event
go func() {
Download(eventCopy, status)
// go DownloadNative(event, status)
for status := range status {
log.Printf("Status: %s\n", status)
}
}()
}
time.Sleep(1 * time.Hour)
}

View File

@@ -1,87 +0,0 @@
package main
import (
"bytes"
"encoding/json"
"log"
"net/http"
"github.com/r3labs/sse"
)
type RealtimeListener struct {
Url string
Collections []string
Create chan PBEvent
Update chan PBEvent
Delete chan PBEvent
client *sse.Client
}
type Subscription struct {
ClientId string `json:"clientId"`
Subscriptions []string `json:"subscriptions"`
}
func (listener RealtimeListener) handlePbEvent(msg *sse.Event) {
var pbEvent = new(PBEvent)
err := json.Unmarshal(msg.Data, &pbEvent)
if err != nil {
log.Printf("Error unmarshalling event: %v\n", err)
return
}
if pbEvent.ClientId != "" {
listener.doSubscribe(pbEvent.ClientId)
}
if pbEvent.Action != "" {
go listener.shipEvent(*pbEvent)
}
}
func (listener RealtimeListener) shipEvent(event PBEvent) {
switch event.Action {
case "create":
listener.Create <- event
case "update":
listener.Update <- event
case "delete":
listener.Delete <- event
default:
log.Printf("Unknown action: %v\n", event.Action)
}
}
func (listener RealtimeListener) doSubscribe(clientId string) {
subscription := Subscription{
ClientId: clientId,
Subscriptions: listener.Collections,
}
body, err := json.Marshal(subscription)
if err != nil {
log.Printf("Error marshalling subscription body: %v\n", err)
return
}
resp, err := http.Post(POCKETBASE_REALTIME, "application/json", bytes.NewBuffer(body))
if err != nil {
log.Printf("Error posting subscription: %v\n", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
log.Printf("Subscription request failed with status: %v\n", resp.Status)
}
}
func (listener *RealtimeListener) initialize() {
listener.Update = make(chan PBEvent, 32)
listener.Create = make(chan PBEvent, 32)
listener.Delete = make(chan PBEvent, 32)
log.Print("initialized")
listener.client = sse.NewClient(listener.Url)
go listener.client.Subscribe("", listener.handlePbEvent)
}

86
tampermonkey.js Normal file
View File

@@ -0,0 +1,86 @@
// ==UserScript==
// @name Youtube Downloader
// @author Cyka
// @match https://www.youtube.com/*
// @version 1.19
// @run-at document-idle
// @noframes
// ==/UserScript==
const URL = `https://youtube-download-ws-server.site.quack-lab.dev/download`;
function waitForElement(element, selector) {
return new Promise((resolve) => {
if (element.querySelector(selector)) {
return resolve(element.querySelector(selector));
}
const observer = new MutationObserver((mutations) => {
if (element.querySelector(selector)) {
resolve(element.querySelector(selector));
observer.disconnect();
}
});
observer.observe(element, {
childList: true,
subtree: true,
});
});
}
function parseVideo(videoElement) {
hookVideo(videoElement);
}
function hookVideo(videoElement) {
videoElement.addEventListener(
"mousedown",
function (e) {
if (e.button === 1) {
e.preventDefault();
e.stopPropagation();
e.stopImmediatePropagation();
const link = videoElement.querySelector("a#video-title-link").href;
console.log(link);
fetch(URL, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
link: link,
}),
}).then((res) => {
console.log(res);
res.json().then((data) => console.log(data));
});
}
},
false
);
}
async function main() {
const videosContainer = await waitForElement(document, "div#contents.style-scope.ytd-rich-grid-renderer");
for (const video of videosContainer.querySelectorAll("ytd-rich-item-renderer")) {
parseVideo(video);
}
new MutationObserver((mutations) => {
mutations = mutations.filter((mutation) => mutation.addedNodes.length > 0);
for (const mutation of mutations) {
if (mutation.target.tagName == "YTD-RICH-ITEM-RENDERER") {
parseVideo(mutation.target);
}
}
}).observe(videosContainer, {
childList: true,
subtree: true,
});
}
main();

5
ws-client/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module main
go 1.22.4
require github.com/gorilla/websocket v1.5.3

2
ws-client/go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

73
ws-client/main.go Normal file
View File

@@ -0,0 +1,73 @@
package main
import (
"log"
"time"
"github.com/gorilla/websocket"
)
type WSConnection struct {
url string
conn *websocket.Conn
errChan chan error
}
func (ws *WSConnection) readMessage() {
log.Printf("Reading messages")
for {
_, message, err := ws.conn.ReadMessage()
if err != nil {
ws.errChan <- err
return
}
log.Printf("Received: %s", message)
}
}
func (ws *WSConnection) writeMessage(message string) {
err := ws.conn.WriteMessage(websocket.TextMessage, []byte(message))
if err != nil {
log.Printf("Error during message writing: %v", err)
ws.errChan <- err
return
}
}
func (ws *WSConnection) handleError() {
for {
err := <-ws.errChan
log.Println("Error during message reading:", err)
time.Sleep(5 * time.Second)
ws.open()
}
}
func (ws *WSConnection) open() {
log.Printf("Connecting to %s", ws.url)
conn, _, err := websocket.DefaultDialer.Dial(ws.url, nil)
if err != nil {
log.Println("Error during connection:", err)
ws.errChan <- err
return
}
log.Printf("Connected")
ws.conn = conn
ws.errChan = make(chan error)
go ws.readMessage()
go ws.handleError()
}
func main() {
log.SetFlags(log.Lmicroseconds)
wsConn := WSConnection{
url: "ws://localhost:8080/ws",
}
wsConn.open()
for {
log.Printf("zzz...")
time.Sleep(30 * time.Second)
}
}

3
ws-server/build.sh Normal file
View File

@@ -0,0 +1,3 @@
# docker build -t youtube-download-ws-server .
tar -cf deploy.tar captain-definition dockerfile main.go go.mod go.sum

View File

@@ -0,0 +1,4 @@
{
"schemaVersion": 2,
"dockerfilePath": "./dockerfile"
}

21
ws-server/dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM golang:1.22.4 as base
WORKDIR $GOPATH/src/app/
COPY . .
RUN go mod download
RUN go mod verify
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /main .
FROM scratch
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=base /etc/passwd /etc/passwd
COPY --from=base /etc/group /etc/group
COPY --from=base /main .
CMD ["/main"]

5
ws-server/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module main
go 1.22.4
require github.com/gorilla/websocket v1.5.3

2
ws-server/go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

169
ws-server/main.go Normal file
View File

@@ -0,0 +1,169 @@
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{}
var wsBroadcast = make(chan string, 128)
var connections = make(map[*WSConnection]bool)
const TIMEOUT = 6
const IDLE_TIMEOUT = TIMEOUT * time.Second
const PING_INTERVAL = (TIMEOUT / 2) * time.Second
type WSConnection struct {
conn *websocket.Conn
writeLock sync.Mutex
ReadChan chan string
WriteChan chan string
ErrorChan chan error
}
func (ws *WSConnection) messageReader() {
log.Printf("Reading messages")
for {
_, message, err := ws.conn.ReadMessage()
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
if err != nil {
ws.ErrorChan <- err
return
}
log.Printf("Received: %s, %d in output channel", message, len(ws.ReadChan))
ws.ReadChan <- string(message)
}
}
func (ws *WSConnection) messageSender() {
log.Printf("Sending messages")
for {
msg := <-ws.WriteChan
ws.writeLock.Lock()
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan))
err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
log.Printf("Error during message writing: %v", err)
ws.ErrorChan <- err
return
}
ws.writeLock.Unlock()
}
}
func (ws *WSConnection) pinger() {
log.Printf("Starting pinger, sleeping for %v", PING_INTERVAL)
for {
time.Sleep(PING_INTERVAL)
log.Printf("Ping")
ws.writeLock.Lock()
err := ws.conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
log.Println("Error during ping:", err)
ws.ErrorChan <- err
return
}
ws.writeLock.Unlock()
}
}
func (ws *WSConnection) Open() {
log.Printf("Client connected")
ws.ReadChan = make(chan string, 1024)
ws.WriteChan = make(chan string, 1024)
ws.ErrorChan = make(chan error, 16)
ws.conn.SetReadLimit(1024)
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetPongHandler(func(string) error {
log.Println("Pong")
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
return nil
})
connections[ws] = true
go ws.messageReader()
go ws.messageSender()
go ws.pinger()
go func() {
for {
select {
case err := <-ws.ErrorChan:
log.Printf("Error: %v", err)
ws.conn.Close()
log.Printf("Client disconnected")
connections[ws] = false
return
// case msg := <-wsBroadcast:
// ws.WriteChan <- msg
}
}
}()
}
func wsHandler(responseWriter http.ResponseWriter, request *http.Request) {
conn, err := upgrader.Upgrade(responseWriter, request, nil)
if err != nil {
fmt.Println("Error during connection upgrade:", err)
return
}
ws := new(WSConnection)
ws.conn = conn
ws.Open()
}
type DownloadReq struct {
Link string `json:"link"`
}
func handleDownload(responseWriter http.ResponseWriter, request *http.Request) {
body, err := io.ReadAll(request.Body)
if err != nil {
log.Printf("Error reading request body: %v", err)
http.Error(responseWriter, "Error reading request body", http.StatusBadRequest)
return
}
defer request.Body.Close()
req := DownloadReq{}
err = json.Unmarshal(body, &req)
if err != nil {
log.Printf("Error parsing JSON: %v", err)
http.Error(responseWriter, "Error parsing JSON", http.StatusBadRequest)
return
}
log.Printf("Received download request: %s, %d in channel", req.Link, len(wsBroadcast))
go func() {
for ws := range connections {
ws.WriteChan <- req.Link
}
}()
// wsBroadcast <- req.Link
}
func main() {
log.SetFlags(log.Lmicroseconds)
http.HandleFunc("/ws", wsHandler)
http.HandleFunc("/download", handleDownload)
log.Println("Server starting on :8080")
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Println("Error starting server:", err)
}
}