aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-20 11:09:48 -0700
committerchrislu <chris.lu@gmail.com>2024-05-20 11:09:48 -0700
commitf13475ac2e046142fde740b45e3f8e88c1ed9cef (patch)
tree4cb3156f220a6f487820573ba78eb1d0f5b9f3f0
parentd45b1d058db19d7adbe258147434a391eb1ceb82 (diff)
downloadseaweedfs-f13475ac2e046142fde740b45e3f8e88c1ed9cef.tar.xz
seaweedfs-f13475ac2e046142fde740b45e3f8e88c1ed9cef.zip
sending keyed offset
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go15
1 files changed, 12 insertions, 3 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 53ac27418..84af3a19c 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -125,7 +125,12 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
defer sub.OnCompletionFunc()
}
- partitionOffsetChan := make(chan int64, 1024)
+ type KeyedOffset struct {
+ Key []byte
+ Offset int64
+ }
+
+ partitionOffsetChan := make(chan KeyedOffset, 1024)
defer func() {
close(partitionOffsetChan)
}()
@@ -136,7 +141,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Sequence: ack,
+ Key: ack.Key,
+ Sequence: ack.Offset,
},
},
})
@@ -161,7 +167,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
executors.Execute(func() {
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil {
- partitionOffsetChan <- m.Data.TsNs
+ partitionOffsetChan <- KeyedOffset{
+ Key: m.Data.Key,
+ Offset: m.Data.TsNs,
+ }
} else {
lastErr = processErr
}