Let go of fucking nsqlookupd
This commit is contained in:
@@ -12,7 +12,7 @@ services:
|
||||
|
||||
nsqd:
|
||||
image: nsqio/nsq:latest
|
||||
command: /nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=nsqlookupd:4160 --max-msg-size=10485760
|
||||
command: /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160 --max-msg-size=10485760
|
||||
ports:
|
||||
- "4150:4150"
|
||||
- "4151:4151"
|
||||
@@ -23,7 +23,7 @@ services:
|
||||
|
||||
nsqadmin:
|
||||
image: nsqio/nsq:latest
|
||||
command: /nsqadmin --lookupd-http-address=nsqlookupd:4160
|
||||
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
|
||||
ports:
|
||||
- "4171:4171"
|
||||
depends_on:
|
||||
|
||||
21
lnsq/nsq.go
21
lnsq/nsq.go
@@ -20,25 +20,14 @@ func ConnectToNSQ(topic, channel string, handler nsq.Handler) (*nsq.Consumer, er
|
||||
|
||||
consumer.AddHandler(handler)
|
||||
|
||||
lookupdAddr := fmt.Sprintf("%s:%d", config.NSQLookupHost, config.NSQLookupPort)
|
||||
err = consumer.ConnectToNSQLookupds([]string{lookupdAddr})
|
||||
nsqdAddr := fmt.Sprintf("%s:%d", config.NSQHost, config.NSQPort)
|
||||
err = consumer.ConnectToNSQD(nsqdAddr)
|
||||
if err != nil {
|
||||
consumer.Stop()
|
||||
return nil, fmt.Errorf("failed to connect to NSQ lookupd: %w", err)
|
||||
return nil, fmt.Errorf("failed to connect to NSQD: %w", err)
|
||||
}
|
||||
|
||||
timeout := 10 * time.Second
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
stats := consumer.Stats()
|
||||
if stats.Connections > 0 {
|
||||
return consumer, nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
consumer.Stop()
|
||||
return nil, fmt.Errorf("timeout waiting for NSQ connection after %v", timeout)
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
func NewProducer() (*nsq.Producer, error) {
|
||||
@@ -52,4 +41,4 @@ func NewProducer() (*nsq.Producer, error) {
|
||||
}
|
||||
|
||||
return producer, nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user