From 7966b06f1b9092cab4ddd2891a17be7a5d6d4721 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Majdand=C5=BEi=C4=87?= Date: Thu, 27 Jun 2024 11:54:14 +0200 Subject: [PATCH] Fix issue with client reconnect and improve client resiliency --- downloader/main.go | 42 ++++++++++++++-- downloader/ws-client.go | 104 ++++++++++++++++++++++++++-------------- 2 files changed, 105 insertions(+), 41 deletions(-) diff --git a/downloader/main.go b/downloader/main.go index d13f204..8f768b8 100644 --- a/downloader/main.go +++ b/downloader/main.go @@ -8,8 +8,31 @@ import ( const WEBSOCKET_SERVER = "ws://youtube-download-ws-server.site.quack-lab.dev/ws" const WEBSOCKET_SERVER_ALT = "ws://localhost:8080/ws" -func main() { +func init() { + // log.SetFlags(log.Lmicroseconds | log.Lshortfile) log.SetFlags(log.Lmicroseconds) +} + +// func instrument() { +// numGoroutines := runtime.NumGoroutine() + +// var m runtime.MemStats +// runtime.ReadMemStats(&m) + +// malloc := float64(m.Alloc) +// ramUsedMB := malloc / 1024 / 1024 +// kbPerGoroutine := malloc / 1024 / float64(numGoroutines) + +// log.Printf("Number of active goroutines: %d; RAM used: %.2f MB; KB per goroutine: %.2f", numGoroutines, ramUsedMB, kbPerGoroutine) +// } + +func main() { + // go func() { + // for { + // instrument() + // time.Sleep(1 * time.Second) + // } + // }() // res, err := http.Get(FULL_URL) // if err != nil { @@ -39,9 +62,18 @@ func main() { // listener.Collections = []string{COLLECTION_NAME} // listener.initialize() - ws := new(WSConnection) - ws.url = WEBSOCKET_SERVER - ws.Open() + var ws WSConnection + go func() { + for { + ws := WSConnection{ + url: WEBSOCKET_SERVER_ALT, + } + ws.Open() + <-ws.Dead + log.Printf("Reconnecting in 5 seconds...") + time.Sleep(5 * time.Second) + } + }() sem := make(chan struct{}, 4) for { @@ -50,7 +82,7 @@ func main() { eventCopy := event status := make(chan error) sem <- struct{}{} - + log.Printf("New event: %+v; semaphore at: %d", eventCopy, len(sem)) go func() { defer func() { diff --git a/downloader/ws-client.go b/downloader/ws-client.go index 907839f..54fc790 100644 --- a/downloader/ws-client.go +++ b/downloader/ws-client.go @@ -13,99 +13,131 @@ const IDLE_TIMEOUT = TIMEOUT * time.Second const PING_INTERVAL = (TIMEOUT / 2) * time.Second type WSConnection struct { + alive bool url string conn *websocket.Conn errChan chan error writeLock sync.Mutex ReadChan chan string WriteChan chan string + Dead chan error } func (ws *WSConnection) messageReader() { - log.Printf("Reading messages") + log.Printf("Starting reader") for { + if !ws.alive { + break + } + _, message, err := ws.conn.ReadMessage() ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) if err != nil { ws.errChan <- err - return + break } + log.Printf("Received: %s, %d in output channel", message, len(ws.ReadChan)) ws.ReadChan <- string(message) } + log.Printf("Reader done") } func (ws *WSConnection) messageSender() { - log.Printf("Sending messages") + log.Printf("Starting sender") for { - msg := <-ws.WriteChan - ws.writeLock.Lock() - - ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) - log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan)) - err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg)) - if err != nil { - log.Printf("Error during message writing: %v", err) - ws.errChan <- err - return + msg, ok := <-ws.WriteChan + if !ok { + break } - ws.writeLock.Unlock() + ws.doSend(msg) + } + log.Printf("Sender done") +} + +func (ws *WSConnection) doSend(msg string) { + ws.writeLock.Lock() + defer ws.writeLock.Unlock() + + ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) + log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan)) + err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg)) + if err != nil { + log.Printf("Error during message writing: %v", err) + ws.errChan <- err + return } } func (ws *WSConnection) pinger() { log.Printf("Starting pinger, sleeping for %v", PING_INTERVAL) for { - time.Sleep(PING_INTERVAL) - - log.Printf("Ping") - ws.writeLock.Lock() - err := ws.conn.WriteMessage(websocket.PingMessage, nil) - if err != nil { - log.Println("Error during ping:", err) - ws.errChan <- err - return + if !ws.alive { + break } - ws.writeLock.Unlock() + ws.doPing() + time.Sleep(PING_INTERVAL) } + log.Printf("Pinger done") +} + +func (ws *WSConnection) doPing() { + ws.writeLock.Lock() + defer ws.writeLock.Unlock() + + // log.Printf("Ping") + err := ws.conn.WriteMessage(websocket.PingMessage, nil) + if err != nil { + log.Println("Error during ping:", err) + ws.errChan <- err + return + } + ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) + // log.Printf("Ping OK") } func (ws *WSConnection) handleError() { for { err := <-ws.errChan - log.Println("Error during message reading:", err) - - time.Sleep(5 * time.Second) - ws.Open() + log.Printf("Client error: %+v", err) + ws.alive = false + ws.conn.Close() + close(ws.ReadChan) + close(ws.WriteChan) + close(ws.errChan) + ws.Dead <- err + return } } func (ws *WSConnection) Open() { log.Printf("Connecting to %s", ws.url) + ws.Dead = make(chan error, 1) + conn, _, err := websocket.DefaultDialer.Dial(ws.url, nil) if err != nil { log.Println("Error during connection:", err) - ws.errChan <- err + ws.Dead <- err return } log.Printf("Connected") ws.conn = conn - ws.errChan = make(chan error) - ws.ReadChan = make(chan string, 1024) - ws.WriteChan = make(chan string, 1024) + ws.alive = true + + ws.errChan = make(chan error, 1) + ws.ReadChan = make(chan string, 128) + ws.WriteChan = make(chan string, 128) - ws.conn.SetReadLimit(1024) ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetPongHandler(func(string) error { - log.Println("Pong") + // log.Println("Pong") ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) - ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) return nil }) + go ws.handleError() go ws.messageReader() go ws.messageSender() - go ws.handleError() go ws.pinger() }