diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-20 09:33:37 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-20 09:33:37 -0700 |
| commit | d8ab22012a1bcaa49de557f05808da3deddcfb1f (patch) | |
| tree | 4129ae83b0adaa65dc63da7b83c1a46fd22d5578 /weed/mq | |
| parent | 2b07a40da5893cba380ac2d9a51e550af6b73e5a (diff) | |
| download | seaweedfs-d8ab22012a1bcaa49de557f05808da3deddcfb1f.tar.xz seaweedfs-d8ab22012a1bcaa49de557f05808da3deddcfb1f.zip | |
track offset
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 8 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub_record/subscriber_record.go | 3 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 5 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 3 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/inflight_message_tracker.go | 4 |
7 files changed, 19 insertions, 8 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index c0b1deef2..646fa869f 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -53,6 +54,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }() startPosition := b.getRequestPosition(req.GetInit()) + imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().Concurrency)) // connect to the follower var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient @@ -97,8 +99,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err) break } - lastOffset = ack.GetAck().Sequence - if subscribeFollowMeStream != nil { + imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence) + currentLastOffset := imt.GetOldest() + if subscribeFollowMeStream != nil && currentLastOffset > lastOffset { if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ Message: &mq_pb.SubscribeFollowMeRequest_Ack{ Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{ @@ -110,6 +113,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs break } println("forwarding ack", lastOffset) + lastOffset = currentLastOffset } } if lastOffset > 0 { diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index 5286c229d..4bbb26032 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -37,7 +37,7 @@ func main() { } processorConfig := sub_client.ProcessorConfiguration{ - ConcurrentPartitionLimit: 3, + MaxPartitionCount: 3, } brokers := strings.Split(*seedBrokers, ",") diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index a5f87a3bb..ed710fa57 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -63,7 +63,8 @@ func main() { } processorConfig := sub_client.ProcessorConfiguration{ - ConcurrentPartitionLimit: 3, + MaxPartitionCount: 3, + PerPartitionConcurrency: 1, } brokers := strings.Split(*seedBrokers, ",") diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 094ce46ef..815694a48 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -51,7 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: sub.ContentConfig.Topic.ToPbTopic(), - MaxPartitionCount: sub.ProcessorConfig.ConcurrentPartitionLimit, + MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount, }, }, }); err != nil { @@ -107,6 +107,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }, Filter: sub.ContentConfig.Filter, FollowerBroker: assigned.FollowerBroker, + Concurrency: sub.ProcessorConfig.PerPartitionConcurrency, }, }, });err != nil { @@ -124,7 +125,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig close(partitionOffsetChan) }() - concurrentPartitionLimit := int(sub.ProcessorConfig.ConcurrentPartitionLimit) + concurrentPartitionLimit := int(sub.ProcessorConfig.MaxPartitionCount) if concurrentPartitionLimit <= 0 { concurrentPartitionLimit = 1 } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index 950d2214c..ba20cf040 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -29,7 +29,7 @@ func (sub *TopicSubscriber) startProcessors() { // listen to the messages from the sub coordinator // start one processor per partition var wg sync.WaitGroup - semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit) + semaphore := make(chan struct{}, sub.ProcessorConfig.MaxPartitionCount) for assigned := range sub.brokerPartitionAssignmentChan { wg.Add(1) diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index c7fa26739..95320b19a 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -22,7 +22,8 @@ type ContentConfiguration struct { } type ProcessorConfiguration struct { - ConcurrentPartitionLimit int32 // how many partitions to process concurrently + MaxPartitionCount int32 // how many partitions to process concurrently + PerPartitionConcurrency int32 // how many messages to process concurrently per partition } type OnEachMessageFunc func(key, value []byte) (err error) diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index aa1f3f0c6..5e13ac427 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -61,6 +61,10 @@ func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bo return true } +func (imt *InflightMessageTracker) GetOldest() int64 { + return imt.timestamps.Oldest() +} + // RingBuffer represents a circular buffer to hold timestamps. type RingBuffer struct { buffer []int64 |
