diff options
Diffstat (limited to 'weed/mq/client/sub_client/process.go')
| -rw-r--r-- | weed/mq/client/sub_client/process.go | 4 |
1 files changed, 4 insertions, 0 deletions
diff --git a/weed/mq/client/sub_client/process.go b/weed/mq/client/sub_client/process.go index 7717a101f..b6bdb14ee 100644 --- a/weed/mq/client/sub_client/process.go +++ b/weed/mq/client/sub_client/process.go @@ -32,6 +32,9 @@ func (sub *TopicSubscriber) doProcess() error { RangeStop: brokerPartitionAssignment.Partition.RangeStop, }, Filter: sub.ContentConfig.Filter, + Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{ + StartTimestampNs: sub.alreadyProcessedTsNs, + }, }, }, }) @@ -68,6 +71,7 @@ func (sub *TopicSubscriber) doProcess() error { if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { return } + sub.alreadyProcessedTsNs = m.Data.TsNs case *mq_pb.SubscribeResponse_Ctrl: if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { return |
