aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/subscribe.go')
-rw-r--r--weed/mq/client/sub_client/subscribe.go17
1 files changed, 16 insertions, 1 deletions
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index 0803b2c79..370f5aa3c 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -4,17 +4,30 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "log"
+ "time"
)
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
+ index := -1
util.RetryUntil("subscribe", func() error {
+ index++
+ index = index % len(sub.bootstrapBrokers)
// ask balancer for brokers of the topic
- if err := sub.doLookup(sub.bootstrapBroker); err != nil {
+ if err := sub.doLookup(sub.bootstrapBrokers[index]); err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
+ if len(sub.brokerPartitionAssignments) == 0 {
+ if sub.waitForMoreMessage {
+ time.Sleep(1 * time.Second)
+ return fmt.Errorf("no broker partition assignments")
+ } else {
+ return nil
+ }
+ }
// treat the first broker as the topic leader
// connect to the leader broker
@@ -25,6 +38,8 @@ func (sub *TopicSubscriber) Subscribe() error {
return nil
}, func(err error) bool {
if err == io.EOF {
+ log.Printf("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
+ sub.waitForMoreMessage = false
return false
}
return true