aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-20 11:07:54 -0700
committerchrislu <chris.lu@gmail.com>2024-05-20 11:07:54 -0700
commitd45b1d058db19d7adbe258147434a391eb1ceb82 (patch)
tree7fea408e2bcba1c229a2028e9443d7a24fbb14fd
parent362219688127097beef288a69bf8df45566e1ec7 (diff)
downloadseaweedfs-d45b1d058db19d7adbe258147434a391eb1ceb82.tar.xz
seaweedfs-d45b1d058db19d7adbe258147434a391eb1ceb82.zip
minor
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go14
1 files changed, 7 insertions, 7 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 118d91bac..53ac27418 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -95,6 +95,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
return fmt.Errorf("create subscribe client: %v", err)
}
+ perPartitionConcurrency := sub.ProcessorConfig.PerPartitionConcurrency
+ if perPartitionConcurrency <= 0 {
+ perPartitionConcurrency = 1
+ }
+
if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
@@ -107,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
},
Filter: sub.ContentConfig.Filter,
FollowerBroker: assigned.FollowerBroker,
- Concurrency: sub.ProcessorConfig.PerPartitionConcurrency,
+ Concurrency: perPartitionConcurrency,
},
},
}); err != nil {
@@ -124,12 +129,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
defer func() {
close(partitionOffsetChan)
}()
-
- perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency)
- if perPartitionConcurrency <= 0 {
- perPartitionConcurrency = 1
- }
- executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency)
+ executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
go func() {
for ack := range partitionOffsetChan {