aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-12-31 17:42:44 -0800
committerchrislu <chris.lu@gmail.com>2023-12-31 17:42:44 -0800
commit32bc8d6a38bbb908429c2004512758910a8fd15b (patch)
tree8cba416ad7f11b0d5287b9e56b6ad9fef3b40262
parent458ddbf919bd0530b61c1377ea883dca22faa761 (diff)
downloadseaweedfs-32bc8d6a38bbb908429c2004512758910a8fd15b.tar.xz
seaweedfs-32bc8d6a38bbb908429c2004512758910a8fd15b.zip
adjust wait time
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go15
1 files changed, 10 insertions, 5 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 3953090f2..fa58fbdff 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -12,6 +12,7 @@ import (
)
func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
+ waitTime := 1 * time.Second
for {
for _, broker := range sub.bootstrapBrokers {
// TODO find the balancer
@@ -25,6 +26,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
glog.V(1).Infof("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
return err
}
+ waitTime = 1 * time.Second
// Maybe later: subscribe to multiple topics instead of just one
@@ -61,8 +63,11 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
})
}
- print("z")
- time.Sleep(3 * time.Second)
+ glog.V(4).Infof("subscriber %s/%s/%s waiting for more assignments", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ if waitTime < 10*time.Second {
+ waitTime += 1 * time.Second
+ }
+ time.Sleep(waitTime)
}
}
@@ -80,7 +85,7 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
go func(partition *mq_pb.Partition, broker string) {
defer wg.Done()
defer func() { <-semaphore }()
- glog.V(0).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
+ glog.V(1).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
sub.onEachPartition(partition, broker)
}(assigned.Partition, assigned.Broker)
}
@@ -117,7 +122,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt.Errorf("create subscribe client: %v", err)
}
- fmt.Printf("subscriber %s/%s/%s connected to partition %+v at %v\n", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
+ glog.V(1).Infof("subscriber %s/%s/%s connected to partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
@@ -134,7 +139,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
}()
for {
- fmt.Printf("subscriber %s/%s/%s waiting for message\n", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
return fmt.Errorf("subscribe error: %v", err)