aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-20 09:33:37 -0700
committerchrislu <chris.lu@gmail.com>2024-05-20 09:33:37 -0700
commitd8ab22012a1bcaa49de557f05808da3deddcfb1f (patch)
tree4129ae83b0adaa65dc63da7b83c1a46fd22d5578 /weed/mq
parent2b07a40da5893cba380ac2d9a51e550af6b73e5a (diff)
downloadseaweedfs-d8ab22012a1bcaa49de557f05808da3deddcfb1f.tar.xz
seaweedfs-d8ab22012a1bcaa49de557f05808da3deddcfb1f.zip
track offset
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go8
-rw-r--r--weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go2
-rw-r--r--weed/mq/client/cmd/weed_sub_record/subscriber_record.go3
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go5
-rw-r--r--weed/mq/client/sub_client/subscribe.go2
-rw-r--r--weed/mq/client/sub_client/subscriber.go3
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker.go4
7 files changed, 19 insertions, 8 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index c0b1deef2..646fa869f 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -53,6 +54,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}()
startPosition := b.getRequestPosition(req.GetInit())
+ imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().Concurrency))
// connect to the follower
var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
@@ -97,8 +99,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err)
break
}
- lastOffset = ack.GetAck().Sequence
- if subscribeFollowMeStream != nil {
+ imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
+ currentLastOffset := imt.GetOldest()
+ if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Ack{
Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
@@ -110,6 +113,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
break
}
println("forwarding ack", lastOffset)
+ lastOffset = currentLastOffset
}
}
if lastOffset > 0 {
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
index 5286c229d..4bbb26032 100644
--- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
+++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
@@ -37,7 +37,7 @@ func main() {
}
processorConfig := sub_client.ProcessorConfiguration{
- ConcurrentPartitionLimit: 3,
+ MaxPartitionCount: 3,
}
brokers := strings.Split(*seedBrokers, ",")
diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
index a5f87a3bb..ed710fa57 100644
--- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
+++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@@ -63,7 +63,8 @@ func main() {
}
processorConfig := sub_client.ProcessorConfiguration{
- ConcurrentPartitionLimit: 3,
+ MaxPartitionCount: 3,
+ PerPartitionConcurrency: 1,
}
brokers := strings.Split(*seedBrokers, ",")
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 094ce46ef..815694a48 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -51,7 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
- MaxPartitionCount: sub.ProcessorConfig.ConcurrentPartitionLimit,
+ MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount,
},
},
}); err != nil {
@@ -107,6 +107,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
},
Filter: sub.ContentConfig.Filter,
FollowerBroker: assigned.FollowerBroker,
+ Concurrency: sub.ProcessorConfig.PerPartitionConcurrency,
},
},
});err != nil {
@@ -124,7 +125,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
close(partitionOffsetChan)
}()
- concurrentPartitionLimit := int(sub.ProcessorConfig.ConcurrentPartitionLimit)
+ concurrentPartitionLimit := int(sub.ProcessorConfig.MaxPartitionCount)
if concurrentPartitionLimit <= 0 {
concurrentPartitionLimit = 1
}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index 950d2214c..ba20cf040 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -29,7 +29,7 @@ func (sub *TopicSubscriber) startProcessors() {
// listen to the messages from the sub coordinator
// start one processor per partition
var wg sync.WaitGroup
- semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
+ semaphore := make(chan struct{}, sub.ProcessorConfig.MaxPartitionCount)
for assigned := range sub.brokerPartitionAssignmentChan {
wg.Add(1)
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index c7fa26739..95320b19a 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -22,7 +22,8 @@ type ContentConfiguration struct {
}
type ProcessorConfiguration struct {
- ConcurrentPartitionLimit int32 // how many partitions to process concurrently
+ MaxPartitionCount int32 // how many partitions to process concurrently
+ PerPartitionConcurrency int32 // how many messages to process concurrently per partition
}
type OnEachMessageFunc func(key, value []byte) (err error)
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
index aa1f3f0c6..5e13ac427 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -61,6 +61,10 @@ func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bo
return true
}
+func (imt *InflightMessageTracker) GetOldest() int64 {
+ return imt.timestamps.Oldest()
+}
+
// RingBuffer represents a circular buffer to hold timestamps.
type RingBuffer struct {
buffer []int64