package client import ( "context" "fmt" "log" "net" "smpptester/pdu" "smpptester/utils" "time" "golang.org/x/time/rate" ) var ( SMPP_CLIENT_RETRY_TIMEOUT = 1 * time.Second SMPP_CLIENT_SEND_QUEUE_SIZE = 1024 * 8 SMPP_CLIENT_RECEIVE_QUEUE_SIZE = 1024 * 8 SMPP_CLIENT_ASYNC_SEND_WORKERS = 16 SMPP_CLIENT_SEND_LIMIT = 4 ) type SMPPClient struct { Id int Connected bool SendQueue chan pdu.PDU ReceiveQueue chan pdu.PDU SendLimit *utils.ReactiveValue[int] sendLimiter *rate.Limiter enabled bool running bool conn net.Conn port string log log.Logger dcErr chan (error) } func NewSMPPClient(port string, id int) *SMPPClient { client := &SMPPClient{ Id: id, SendQueue: make(chan pdu.PDU, SMPP_CLIENT_SEND_QUEUE_SIZE), ReceiveQueue: make(chan pdu.PDU, SMPP_CLIENT_RECEIVE_QUEUE_SIZE), SendLimit: utils.NewReactiveValue[int](SMPP_CLIENT_SEND_LIMIT), sendLimiter: rate.NewLimiter(rate.Limit(SMPP_CLIENT_SEND_LIMIT), SMPP_CLIENT_SEND_LIMIT), port: port, log: log.Logger{}, dcErr: make(chan error, 128), } client.log = *log.New(log.Writer(), "", log.LstdFlags|log.Lshortfile) client.log.SetPrefix(fmt.Sprintf("SMPP client %d: ", client.Id)) go func() { limit := client.SendLimit.Subscribe() defer client.SendLimit.Unsubscribe(limit) for { v := <-limit client.log.Printf("Send limit changed to %d", *v) client.sendLimiter.SetLimit(rate.Limit(*v)) client.sendLimiter.SetBurst(*v) } }() return client } func (c *SMPPClient) Enable() { if c.enabled { c.log.Printf("SMPP client %d is already enabled", c.Id) return } c.enabled = true c.start() c.log.Printf("SMPP client %d enabled", c.Id) } func (c *SMPPClient) Disable() { if !c.enabled { c.log.Printf("SMPP client %d is already disabled", c.Id) return } c.enabled = false c.dcErr <- fmt.Errorf("client is disabled") if c.conn != nil { c.conn.Close() } c.log.Printf("SMPP client %d disabled", c.Id) } func (c *SMPPClient) start() { if c.running { c.log.Printf("SMPP client %d is already running", c.Id) return } go func() { c.running = true defer func() { c.running = false }() c.log.Printf("SMPP client %d started", c.Id) for { if c.enabled && !c.Connected { c.log.Printf("Trying to connect to %s", c.port) err := c.connect() if err != nil { c.dcErr <- err } } err := <-c.dcErr c.Connected = false if c.enabled { c.log.Printf("Disconnected: '%v' trying again in %d second(s)", err, int(SMPP_CLIENT_RETRY_TIMEOUT.Seconds())) time.Sleep(SMPP_CLIENT_RETRY_TIMEOUT) } else { c.log.Printf("Disconnected: '%v' & client is disabled, quitting", err) break } } }() } func (c *SMPPClient) connect() error { if !c.enabled { return fmt.Errorf("client is not enabled") } if c.Connected { return fmt.Errorf("client is already connected") } conn, err := net.Dial("tcp", c.port) if err != nil { return fmt.Errorf("failed to connect to SMPP server: %w", err) } c.log.Printf("SMPP client %d connected to %s", c.Id, c.port) c.Connected = true c.conn = conn return nil } func (c *SMPPClient) Send(pdata pdu.PDU) error { if c.conn == nil { return fmt.Errorf("connection is not established") } pdata.UpdateSize() buf := pdu.ByteBufferPool.Get(pdata.Size()) err := pdata.Encode(buf) if err != nil { return fmt.Errorf("failed to encode PDU: %w", err) } err = c.sendLimiter.Wait(context.Background()) if err != nil { return fmt.Errorf("failed to wait for rate limiter: %w", err) } _, err = c.conn.Write(buf.Bytes()) if err != nil { c.dcErr <- err return fmt.Errorf("failed to send PDU: %w", err) } // c.log.Printf("SMPP client %d sent PDU: %+v", c.Id, pdata) return nil }