aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-22 11:56:55 -0800
committerchrislu <chris.lu@gmail.com>2024-01-22 11:56:55 -0800
commit3ff6b31d94d4a3fc006acd33c8bb9fdb8d96f023 (patch)
tree274e000f88470bf8e2ff0748ba6ec3ec4bd9af38
parent2beaa2d0b3d31d4686139f0b4c151e348ec6d340 (diff)
downloadseaweedfs-3ff6b31d94d4a3fc006acd33c8bb9fdb8d96f023.tar.xz
seaweedfs-3ff6b31d94d4a3fc006acd33c8bb9fdb8d96f023.zip
sub broker wait for partition
-rw-r--r--weed/mq/broker/broker_grpc_sub.go7
1 files changed, 6 insertions, 1 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 8a6acadb9..1a34d49e7 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -21,6 +21,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+ waitIntervalCount := 0
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
@@ -32,7 +33,11 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
if localTopicPartition != nil {
break
}
- time.Sleep(337 * time.Millisecond)
+ waitIntervalCount++
+ if waitIntervalCount > 10 {
+ waitIntervalCount = 10
+ }
+ time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
case <-ctx.Done():