aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-03 15:57:36 -0800
committerchrislu <chris.lu@gmail.com>2024-01-03 15:57:36 -0800
commit47a4963d7ae1b827b51fe1d0c0748ad6a31fcae5 (patch)
tree3b610e0af01c2fe4509a9505b5c0614ca2b7d0ae
parent35869b5c8090af92475fadf4766892f4db5adf7b (diff)
downloadseaweedfs-47a4963d7ae1b827b51fe1d0c0748ad6a31fcae5.tar.xz
seaweedfs-47a4963d7ae1b827b51fe1d0c0748ad6a31fcae5.zip
subscription start from specified timestamp
-rw-r--r--weed/mq/broker/broker_grpc_sub.go4
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go2
2 files changed, 2 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index c98ce4684..ecf771b9f 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -38,11 +38,9 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb
}()
ctx := stream.Context()
- var startTime time.Time
+ startTime := time.Now()
if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 {
startTime = time.Unix(0, startTs)
- } else {
- startTime = time.Now()
}
localTopicPartition.Subscribe(clientName, startTime, func() bool {
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index 310e5ac78..7488e60f0 100644
--- a/weed/mq/client/cmd/weed_sub/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
@@ -30,7 +30,7 @@ func main() {
Namespace: *namespace,
Topic: *topic,
Filter: "",
- StartTime: time.Unix(0, 0),
+ StartTime: time.Unix(1, 1),
}
processorConfig := sub_client.ProcessorConfiguration{