Fix issue with download order not being propagated

I assume it's because dead sockets are consuming broadcast messages
This commit is contained in:
2024-06-19 21:40:31 +02:00
parent 7cac378d04
commit 0174f3d9b9
2 changed files with 23 additions and 13 deletions

View File

@@ -10,7 +10,7 @@ import (
const TIMEOUT = 6
const IDLE_TIMEOUT = TIMEOUT * time.Second
const PING_INTERVAL = (TIMEOUT - 1) * time.Second
const PING_INTERVAL = (TIMEOUT / 2) * time.Second
type WSConnection struct {
url string
@@ -30,7 +30,7 @@ func (ws *WSConnection) messageReader() {
ws.errChan <- err
return
}
log.Printf("Received: %s", message)
log.Printf("Received: %s, %d in output channel", message, len(ws.ReadChan))
ws.ReadChan <- string(message)
}
}
@@ -42,7 +42,7 @@ func (ws *WSConnection) messageSender() {
ws.writeLock.Lock()
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
log.Printf("Sending: %s", msg)
log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan))
err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
log.Printf("Error during message writing: %v", err)
@@ -58,7 +58,7 @@ func (ws *WSConnection) pinger() {
for {
time.Sleep(PING_INTERVAL)
// log.Printf("Ping")
log.Printf("Ping")
ws.writeLock.Lock()
err := ws.conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
@@ -98,7 +98,7 @@ func (ws *WSConnection) Open() {
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetPongHandler(func(string) error {
// log.Println("Pong")
log.Println("Pong")
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
return nil

View File

@@ -17,13 +17,14 @@ var wsBroadcast = make(chan string, 128)
const TIMEOUT = 6
const IDLE_TIMEOUT = TIMEOUT * time.Second
const PING_INTERVAL = (TIMEOUT - 1) * time.Second
const PING_INTERVAL = (TIMEOUT / 2) * time.Second
type WSConnection struct {
conn *websocket.Conn
writeLock sync.Mutex
ReadChan chan string
WriteChan chan string
ErrorChan chan error
}
func (ws *WSConnection) messageReader() {
@@ -32,9 +33,10 @@ func (ws *WSConnection) messageReader() {
_, message, err := ws.conn.ReadMessage()
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
if err != nil {
ws.ErrorChan <- err
return
}
log.Printf("Received: %s", message)
log.Printf("Received: %s, %d in output channel", message, len(ws.ReadChan))
ws.ReadChan <- string(message)
}
}
@@ -46,10 +48,11 @@ func (ws *WSConnection) messageSender() {
ws.writeLock.Lock()
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
log.Printf("Sending: %s", msg)
log.Printf("Sending: %s, %d in input channel", msg, len(ws.WriteChan))
err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
log.Printf("Error during message writing: %v", err)
ws.ErrorChan <- err
return
}
ws.writeLock.Unlock()
@@ -61,11 +64,12 @@ func (ws *WSConnection) pinger() {
for {
time.Sleep(PING_INTERVAL)
// log.Printf("Ping")
log.Printf("Ping")
ws.writeLock.Lock()
err := ws.conn.WriteMessage(websocket.PingMessage, nil)
if err != nil {
log.Println("Error during ping:", err)
ws.ErrorChan <- err
return
}
ws.writeLock.Unlock()
@@ -81,7 +85,7 @@ func (ws *WSConnection) Open() {
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetPongHandler(func(string) error {
// log.Println("Pong")
log.Println("Pong")
ws.conn.SetReadDeadline(time.Now().Add(IDLE_TIMEOUT))
ws.conn.SetWriteDeadline(time.Now().Add(IDLE_TIMEOUT))
return nil
@@ -92,8 +96,14 @@ func (ws *WSConnection) Open() {
go ws.pinger()
go func() {
for {
msg := <-wsBroadcast
ws.WriteChan <- msg
select {
case err := <-ws.ErrorChan:
log.Printf("Error: %v", err)
ws.conn.Close()
return
case msg := <-wsBroadcast:
ws.WriteChan <- msg
}
}
}()
}
@@ -131,7 +141,7 @@ func handleDownload(responseWriter http.ResponseWriter, request *http.Request) {
return
}
log.Printf("Received download request: %s", req.Link)
log.Printf("Received download request: %s, %d in channel", req.Link, len(wsBroadcast))
wsBroadcast <- req.Link
}