From 0174f3d9b926acaade4a926b4672c8ebb3fd0c8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Majdand=C5=BEi=C4=87?= Date: Wed, 19 Jun 2024 21:40:31 +0200 Subject: [PATCH] Fix issue with download order not being propagated I assume it's because dead sockets are consuming broadcast messages --- downloader/ws-client.go | 10 +++++----- ws-server/main.go | 26 ++++++++++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/downloader/ws-client.go b/downloader/ws-client.go index ba982a9..907839f 100644 --- a/downloader/ws-client.go +++ b/downloader/ws-client.go @@ -10,7 +10,7 @@ import ( const TIMEOUT = 6 const IDLE_TIMEOUT = TIMEOUT * time.Second -const PING_INTERVAL = (TIMEOUT - 1) * time.Second +const PING_INTERVAL = (TIMEOUT / 2) * time.Second type WSConnection struct { url string @@ -30,7 +30,7 @@ func (ws *WSConnection) messageReader() { ws.errChan <- err return } - log.Printf("Received: %s", message) + log.Printf("Received: %s, %d in output channel", message, len(ws.ReadChan)) ws.ReadChan <- string(message) } } @@ -42,7 +42,7 @@ func (ws *WSConnection) messageSender() { ws.writeLock.Lock() ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) - log.Printf("Sending: %s", msg) + log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan)) err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg)) if err != nil { log.Printf("Error during message writing: %v", err) @@ -58,7 +58,7 @@ func (ws *WSConnection) pinger() { for { time.Sleep(PING_INTERVAL) - // log.Printf("Ping") + log.Printf("Ping") ws.writeLock.Lock() err := ws.conn.WriteMessage(websocket.PingMessage, nil) if err != nil { @@ -98,7 +98,7 @@ func (ws *WSConnection) Open() { ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetPongHandler(func(string) error { - // log.Println("Pong") + log.Println("Pong") ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) return nil diff --git a/ws-server/main.go b/ws-server/main.go index 6aeddf1..d9a3c9b 100644 --- a/ws-server/main.go +++ b/ws-server/main.go @@ -17,13 +17,14 @@ var wsBroadcast = make(chan string, 128) const TIMEOUT = 6 const IDLE_TIMEOUT = TIMEOUT * time.Second -const PING_INTERVAL = (TIMEOUT - 1) * time.Second +const PING_INTERVAL = (TIMEOUT / 2) * time.Second type WSConnection struct { conn *websocket.Conn writeLock sync.Mutex ReadChan chan string WriteChan chan string + ErrorChan chan error } func (ws *WSConnection) messageReader() { @@ -32,9 +33,10 @@ func (ws *WSConnection) messageReader() { _, message, err := ws.conn.ReadMessage() ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) if err != nil { + ws.ErrorChan <- err return } - log.Printf("Received: %s", message) + log.Printf("Received: %s, %d in output channel", message, len(ws.ReadChan)) ws.ReadChan <- string(message) } } @@ -46,10 +48,11 @@ func (ws *WSConnection) messageSender() { ws.writeLock.Lock() ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) - log.Printf("Sending: %s", msg) + log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan)) err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg)) if err != nil { log.Printf("Error during message writing: %v", err) + ws.ErrorChan <- err return } ws.writeLock.Unlock() @@ -61,11 +64,12 @@ func (ws *WSConnection) pinger() { for { time.Sleep(PING_INTERVAL) - // log.Printf("Ping") + log.Printf("Ping") ws.writeLock.Lock() err := ws.conn.WriteMessage(websocket.PingMessage, nil) if err != nil { log.Println("Error during ping:", err) + ws.ErrorChan <- err return } ws.writeLock.Unlock() @@ -81,7 +85,7 @@ func (ws *WSConnection) Open() { ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetPongHandler(func(string) error { - // log.Println("Pong") + log.Println("Pong") ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT)) ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT)) return nil @@ -92,8 +96,14 @@ func (ws *WSConnection) Open() { go ws.pinger() go func() { for { - msg := <-wsBroadcast - ws.WriteChan <- msg + select { + case err := <-ws.ErrorChan: + log.Printf("Error: %v", err) + ws.conn.Close() + return + case msg := <-wsBroadcast: + ws.WriteChan <- msg + } } }() } @@ -131,7 +141,7 @@ func handleDownload(responseWriter http.ResponseWriter, request *http.Request) { return } - log.Printf("Received download request: %s", req.Link) + log.Printf("Received download request: %s, %d in channel", req.Link, len(wsBroadcast)) wsBroadcast <- req.Link }