diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-03 15:57:36 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-03 15:57:36 -0800 |
| commit | 47a4963d7ae1b827b51fe1d0c0748ad6a31fcae5 (patch) | |
| tree | 3b610e0af01c2fe4509a9505b5c0614ca2b7d0ae | |
| parent | 35869b5c8090af92475fadf4766892f4db5adf7b (diff) | |
| download | seaweedfs-47a4963d7ae1b827b51fe1d0c0748ad6a31fcae5.tar.xz seaweedfs-47a4963d7ae1b827b51fe1d0c0748ad6a31fcae5.zip | |
subscription start from specified timestamp
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 4 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub/subscriber.go | 2 |
2 files changed, 2 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index c98ce4684..ecf771b9f 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -38,11 +38,9 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb }() ctx := stream.Context() - var startTime time.Time + startTime := time.Now() if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 { startTime = time.Unix(0, startTs) - } else { - startTime = time.Now() } localTopicPartition.Subscribe(clientName, startTime, func() bool { diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go index 310e5ac78..7488e60f0 100644 --- a/weed/mq/client/cmd/weed_sub/subscriber.go +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -30,7 +30,7 @@ func main() { Namespace: *namespace, Topic: *topic, Filter: "", - StartTime: time.Unix(0, 0), + StartTime: time.Unix(1, 1), } processorConfig := sub_client.ProcessorConfiguration{ |
