package main import ( "log" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" ) type WSConnection struct { id int32 conn *websocket.Conn writeLock sync.Mutex alive bool ReadChan chan string WriteChan chan string ErrorChan chan error IdleTimeout time.Duration PingInterval time.Duration } func NewConn(conn *websocket.Conn, server *WSServer) *WSConnection { wsconn := &WSConnection{ id: server.clientId, conn: conn, alive: true, IdleTimeout: server.IdleTimeout, PingInterval: server.PingInterval, ReadChan: make(chan string, 1024), WriteChan: make(chan string, 1024), ErrorChan: make(chan error, 1), } atomic.AddInt32(&server.clientId, 1) return wsconn } func (ws *WSConnection) Open() { go ws.messageReader() go ws.messageSender() go ws.pinger() ws.conn.SetPongHandler(func(string) error { // log.Printf("Client %d: Pong OK", ws.id) ws.conn.SetReadDeadline(time.Now().Add(ws.IdleTimeout)) return nil }) } func (ws *WSConnection) messageReader() { log.Printf("Client %d: Reading messages", ws.id) for { _, message, err := ws.conn.ReadMessage() if !ws.alive { break } ws.conn.SetReadDeadline(time.Now().Add(ws.IdleTimeout)) if err != nil { ws.ErrorChan <- err break } log.Printf("Client %d: Received: %s, %d in output channel", ws.id, message, len(ws.ReadChan)) ws.ReadChan <- string(message) } log.Printf("Client %d: Stopped reading messages", ws.id) } func (ws *WSConnection) messageSender() { log.Printf("Client %d: Sending messages", ws.id) for { msg, ok := <-ws.WriteChan if !ok || !ws.alive { break } ws.writeLock.Lock() ws.conn.SetWriteDeadline(time.Now().Add(ws.IdleTimeout)) log.Printf("Client %d: Sending: %s, %d in input channel", ws.id, msg, len(ws.WriteChan)) err := ws.conn.WriteMessage(websocket.TextMessage, []byte(msg)) if err != nil { log.Printf("Client %d: Error during message writing: %v", ws.id, err) ws.ErrorChan <- err ws.writeLock.Unlock() break } ws.writeLock.Unlock() } log.Printf("Client %d: Stopped sending messages", ws.id) } func (ws *WSConnection) pinger() { log.Printf("Client %d: Starting pinger, sleeping for %v", ws.id, ws.PingInterval) for { time.Sleep(ws.PingInterval) if !ws.alive { break } // log.Printf("Client %d: Ping", ws.id) ws.writeLock.Lock() err := ws.conn.WriteMessage(websocket.PingMessage, nil) if err != nil { log.Printf("Client %d: Error during ping: %+v", ws.id, err) ws.ErrorChan <- err break } ws.conn.SetWriteDeadline(time.Now().Add(ws.IdleTimeout)) ws.writeLock.Unlock() // log.Printf("Client %d: Ping OK", ws.id) } log.Printf("Client %d: Stopped pinger", ws.id) }