aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/subscribe.go')
-rw-r--r--weed/mq/client/sub_client/subscribe.go18
1 files changed, 17 insertions, 1 deletions
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index 5669bb348..cf2294891 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -4,6 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"time"
)
@@ -20,6 +21,7 @@ func (sub *TopicSubscriber) Subscribe() error {
go sub.startProcessors()
// loop forever
+ // TODO shutdown the subscriber when not needed anymore
sub.doKeepConnectedToSubCoordinator()
return nil
@@ -66,7 +68,21 @@ func (sub *TopicSubscriber) startProcessors() {
},
},
}
- err := sub.onEachPartition(assigned, stopChan)
+
+ executors := util.NewLimitedConcurrentExecutor(int(sub.SubscriberConfig.SlidingWindowSize))
+ onDataMessageFn := func(m *mq_pb.SubscribeMessageResponse_Data) {
+ executors.Execute(func() {
+ processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
+ if processErr == nil {
+ sub.PartitionOffsetChan <- KeyedOffset{
+ Key: m.Data.Key,
+ Offset: m.Data.TsNs,
+ }
+ }
+ })
+ }
+
+ err := sub.onEachPartition(assigned, stopChan, onDataMessageFn)
if err != nil {
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {