diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-20 11:07:54 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-20 11:07:54 -0700 |
| commit | d45b1d058db19d7adbe258147434a391eb1ceb82 (patch) | |
| tree | 7fea408e2bcba1c229a2028e9443d7a24fbb14fd | |
| parent | 362219688127097beef288a69bf8df45566e1ec7 (diff) | |
| download | seaweedfs-d45b1d058db19d7adbe258147434a391eb1ceb82.tar.xz seaweedfs-d45b1d058db19d7adbe258147434a391eb1ceb82.zip | |
minor
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 118d91bac..53ac27418 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -95,6 +95,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig return fmt.Errorf("create subscribe client: %v", err) } + perPartitionConcurrency := sub.ProcessorConfig.PerPartitionConcurrency + if perPartitionConcurrency <= 0 { + perPartitionConcurrency = 1 + } + if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Init{ Init: &mq_pb.SubscribeMessageRequest_InitMessage{ @@ -107,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }, Filter: sub.ContentConfig.Filter, FollowerBroker: assigned.FollowerBroker, - Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, + Concurrency: perPartitionConcurrency, }, }, }); err != nil { @@ -124,12 +129,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig defer func() { close(partitionOffsetChan) }() - - perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency) - if perPartitionConcurrency <= 0 { - perPartitionConcurrency = 1 - } - executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency) + executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) go func() { for ack := range partitionOffsetChan { |
