diff options
Diffstat (limited to 'weed/mq/client/sub_client/on_each_partition.go')
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index b6d6e90b5..470e886d2 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -4,16 +4,17 @@ import ( "context" "errors" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - "io" ) -type KeyedOffset struct { - Key []byte - Offset int64 +type KeyedTimestamp struct { + Key []byte + TsNs int64 // Timestamp in nanoseconds for acknowledgment } func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error { @@ -78,8 +79,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Key: ack.Key, - Sequence: ack.Offset, + Key: ack.Key, + TsNs: ack.TsNs, }, }, }) |
