aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/on_each_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/on_each_partition.go')
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go13
1 files changed, 7 insertions, 6 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index b6d6e90b5..470e886d2 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
+ "io"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "io"
)
-type KeyedOffset struct {
- Key []byte
- Offset int64
+type KeyedTimestamp struct {
+ Key []byte
+ TsNs int64 // Timestamp in nanoseconds for acknowledgment
}
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error {
@@ -78,8 +79,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Key: ack.Key,
- Sequence: ack.Offset,
+ Key: ack.Key,
+ TsNs: ack.TsNs,
},
},
})