aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-19 14:55:03 -0700
committerchrislu <chris.lu@gmail.com>2024-05-19 14:55:03 -0700
commit8d02712c630d6f23ec6540e691939069959fd0b4 (patch)
tree4687a4cb717bfb7291f23a3a6a1c8767349422d3
parent8ce2cced47d6b4d830d5e820649d6121a915ee4a (diff)
downloadseaweedfs-8d02712c630d6f23ec6540e691939069959fd0b4.tar.xz
seaweedfs-8d02712c630d6f23ec6540e691939069959fd0b4.zip
ConcurrentPartitionLimit
-rw-r--r--weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go4
-rw-r--r--weed/mq/client/cmd/weed_sub_record/subscriber_record.go4
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go30
3 files changed, 23 insertions, 15 deletions
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
index e920e73c7..5286c229d 100644
--- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
+++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
@@ -44,10 +44,10 @@ func main() {
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
counter := 0
- subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) {
+ subscriber.SetEachMessageFunc(func(key, value []byte) (error) {
counter++
println(string(key), "=>", string(value), counter)
- return true, nil
+ return nil
})
subscriber.SetCompletionFunc(func() {
diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
index 988e40419..a5f87a3bb 100644
--- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
+++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@@ -70,12 +70,12 @@ func main() {
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
counter := 0
- subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) {
+ subscriber.SetEachMessageFunc(func(key, value []byte) (error) {
counter++
record := &schema_pb.RecordValue{}
proto.Unmarshal(value, record)
fmt.Printf("record: %v\n", record)
- return true, nil
+ return nil
})
subscriber.SetCompletionFunc(func() {
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 8ee34d65a..e26997b08 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"io"
"time"
)
@@ -119,11 +120,16 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
partitionOffsetChan:= make(chan int64, 1024)
-
defer func() {
close(partitionOffsetChan)
}()
+ concurrentPartitionLimit := int(sub.ProcessorConfig.ConcurrentPartitionLimit)
+ if concurrentPartitionLimit <= 0 {
+ concurrentPartitionLimit = 1
+ }
+ executors := util.NewLimitedConcurrentExecutor(concurrentPartitionLimit)
+
go func() {
for ack := range partitionOffsetChan {
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
@@ -137,7 +143,9 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.CloseSend()
}()
- for {
+ var lastErr error
+
+ for lastErr != nil {
// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
@@ -149,14 +157,14 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
switch m := resp.Message.(type) {
case *mq_pb.SubscribeMessageResponse_Data:
- shouldContinue, processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
- if processErr != nil {
- return fmt.Errorf("process error: %v", processErr)
- }
- partitionOffsetChan <- m.Data.TsNs
- if !shouldContinue {
- return nil
- }
+ executors.Execute(func() {
+ processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
+ if processErr == nil {
+ partitionOffsetChan <- m.Data.TsNs
+ }else{
+ lastErr = processErr
+ }
+ })
case *mq_pb.SubscribeMessageResponse_Ctrl:
// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
@@ -165,6 +173,6 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
}
- return nil
+ return lastErr
})
}