aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_sub.go7
1 files changed, 6 insertions, 1 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 8a6acadb9..1a34d49e7 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -21,6 +21,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+ waitIntervalCount := 0
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
@@ -32,7 +33,11 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
if localTopicPartition != nil {
break
}
- time.Sleep(337 * time.Millisecond)
+ waitIntervalCount++
+ if waitIntervalCount > 10 {
+ waitIntervalCount = 10
+ }
+ time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():