aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/process.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/process.go')
-rw-r--r--weed/mq/client/sub_client/process.go4
1 files changed, 4 insertions, 0 deletions
diff --git a/weed/mq/client/sub_client/process.go b/weed/mq/client/sub_client/process.go
index 7717a101f..b6bdb14ee 100644
--- a/weed/mq/client/sub_client/process.go
+++ b/weed/mq/client/sub_client/process.go
@@ -32,6 +32,9 @@ func (sub *TopicSubscriber) doProcess() error {
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
+ Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{
+ StartTimestampNs: sub.alreadyProcessedTsNs,
+ },
},
},
})
@@ -68,6 +71,7 @@ func (sub *TopicSubscriber) doProcess() error {
if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
return
}
+ sub.alreadyProcessedTsNs = m.Data.TsNs
case *mq_pb.SubscribeResponse_Ctrl:
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return