diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 8a3fbd9..51830e6 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -12,7 +12,7 @@ services: nsqd: image: nsqio/nsq:latest - command: /nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=nsqlookupd:4160 --max-msg-size=10485760 + command: /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160 --max-msg-size=10485760 ports: - "4150:4150" - "4151:4151" @@ -23,7 +23,7 @@ services: nsqadmin: image: nsqio/nsq:latest - command: /nsqadmin --lookupd-http-address=nsqlookupd:4160 + command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 ports: - "4171:4171" depends_on: diff --git a/lnsq/nsq.go b/lnsq/nsq.go index 06d5716..3fd7a80 100644 --- a/lnsq/nsq.go +++ b/lnsq/nsq.go @@ -20,25 +20,14 @@ func ConnectToNSQ(topic, channel string, handler nsq.Handler) (*nsq.Consumer, er consumer.AddHandler(handler) - lookupdAddr := fmt.Sprintf("%s:%d", config.NSQLookupHost, config.NSQLookupPort) - err = consumer.ConnectToNSQLookupds([]string{lookupdAddr}) + nsqdAddr := fmt.Sprintf("%s:%d", config.NSQHost, config.NSQPort) + err = consumer.ConnectToNSQD(nsqdAddr) if err != nil { consumer.Stop() - return nil, fmt.Errorf("failed to connect to NSQ lookupd: %w", err) + return nil, fmt.Errorf("failed to connect to NSQD: %w", err) } - timeout := 10 * time.Second - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - stats := consumer.Stats() - if stats.Connections > 0 { - return consumer, nil - } - time.Sleep(100 * time.Millisecond) - } - - consumer.Stop() - return nil, fmt.Errorf("timeout waiting for NSQ connection after %v", timeout) + return consumer, nil } func NewProducer() (*nsq.Producer, error) { @@ -52,4 +41,4 @@ func NewProducer() (*nsq.Producer, error) { } return producer, nil -} \ No newline at end of file +}