From 0315a0877bcbc2394f01c8648709d30a39cebf46 Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Fri, 2 Aug 2024 11:13:40 +0200 Subject: [PATCH] Implement some sort of rate limiting --- client/client.go | 71 ++++++++++++++++++++++++++++++++++++------------ go.mod | 5 +++- go.sum | 2 ++ main.go | 14 ++++++---- 4 files changed, 69 insertions(+), 23 deletions(-) diff --git a/client/client.go b/client/client.go index 2f5694b..edd9dde 100644 --- a/client/client.go +++ b/client/client.go @@ -1,35 +1,67 @@ package client import ( + "context" "fmt" "log" "net" "smpptester/pdu" + "smpptester/utils" "time" + + "golang.org/x/time/rate" ) -const RETRY_TIMEOUT = 1 * time.Second +var ( + SMPP_CLIENT_RETRY_TIMEOUT = 1 * time.Second + SMPP_CLIENT_SEND_QUEUE_SIZE = 1024 * 8 + SMPP_CLIENT_RECEIVE_QUEUE_SIZE = 1024 * 8 + SMPP_CLIENT_ASYNC_SEND_WORKERS = 16 + SMPP_CLIENT_SEND_LIMIT = 4 +) type SMPPClient struct { - Id int - Connected bool - enabled bool - running bool - conn net.Conn - port string - log log.Logger - dcErr chan (error) + Id int + Connected bool + SendQueue chan pdu.PDU + ReceiveQueue chan pdu.PDU + SendLimit *utils.ReactiveValue[int] + + sendLimiter *rate.Limiter + enabled bool + running bool + conn net.Conn + port string + log log.Logger + dcErr chan (error) } func NewSMPPClient(port string, id int) *SMPPClient { client := &SMPPClient{ - Id: id, - port: port, - log: log.Logger{}, - dcErr: make(chan error, 128), + Id: id, + SendQueue: make(chan pdu.PDU, SMPP_CLIENT_SEND_QUEUE_SIZE), + ReceiveQueue: make(chan pdu.PDU, SMPP_CLIENT_RECEIVE_QUEUE_SIZE), + SendLimit: utils.NewReactiveValue[int](SMPP_CLIENT_SEND_LIMIT), + + sendLimiter: rate.NewLimiter(rate.Limit(SMPP_CLIENT_SEND_LIMIT), SMPP_CLIENT_SEND_LIMIT), + port: port, + log: log.Logger{}, + dcErr: make(chan error, 128), } - client.log = *log.New(log.Writer(), "", log.LstdFlags) + client.log = *log.New(log.Writer(), "", log.LstdFlags|log.Lshortfile) client.log.SetPrefix(fmt.Sprintf("SMPP client %d: ", client.Id)) + + go func() { + limit := client.SendLimit.Subscribe() + defer client.SendLimit.Unsubscribe(limit) + for { + v := <-limit + client.log.Printf("Send limit changed to %d", *v) + client.sendLimiter.SetLimit(rate.Limit(*v)) + client.sendLimiter.SetBurst(*v) + } + }() + return client } @@ -78,8 +110,8 @@ func (c *SMPPClient) start() { err := <-c.dcErr c.Connected = false if c.enabled { - c.log.Printf("Disconnected: '%v' trying again in %d second(s)", err, int(RETRY_TIMEOUT.Seconds())) - time.Sleep(RETRY_TIMEOUT) + c.log.Printf("Disconnected: '%v' trying again in %d second(s)", err, int(SMPP_CLIENT_RETRY_TIMEOUT.Seconds())) + time.Sleep(SMPP_CLIENT_RETRY_TIMEOUT) } else { c.log.Printf("Disconnected: '%v' & client is disabled, quitting", err) break @@ -117,12 +149,17 @@ func (c *SMPPClient) Send(pdata pdu.PDU) error { return fmt.Errorf("failed to encode PDU: %w", err) } + err = c.sendLimiter.Wait(context.Background()) + if err != nil { + return fmt.Errorf("failed to wait for rate limiter: %w", err) + } + _, err = c.conn.Write(buf.Bytes()) if err != nil { c.dcErr <- err return fmt.Errorf("failed to send PDU: %w", err) } - c.log.Printf("SMPP client %d sent PDU: %+v", c.Id, pdata) + // c.log.Printf("SMPP client %d sent PDU: %+v", c.Id, pdata) return nil } diff --git a/go.mod b/go.mod index 27b1fc7..0b92b48 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module smpptester go 1.22.4 -require github.com/yuin/gopher-lua v1.1.1 +require ( + github.com/yuin/gopher-lua v1.1.1 + golang.org/x/time v0.5.0 +) diff --git a/go.sum b/go.sum index e7daa0c..2648d80 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= diff --git a/main.go b/main.go index 6ae51c7..2744178 100644 --- a/main.go +++ b/main.go @@ -175,12 +175,16 @@ func main() { client.Enable() for { if client.Connected { - err := client.Send(submit) - if err != nil { - log.Printf("Failed to send PDU: %v", err) - } + // err := client.Send(submit) + // if err != nil { + // log.Printf("Failed to send PDU: %v", err) + // } + go client.Send(submit) + // if rand.Int() % 1000 == 0 { + // client.SendLimit.Set(rand.Int() % 1000) + // } } - time.Sleep(10 * time.Millisecond) + time.Sleep(500 * time.Millisecond) } log.Println("Are we done")