diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-30 02:19:51 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-30 02:19:51 -0700 |
| commit | 8c73410a51441d7f9f1140a8996dd3eb1f191f2e (patch) | |
| tree | 87e7a9cca2509abf25004b3150c52399b20a2ec5 /weed/messaging/client/subscriber.go | |
| parent | 4e16a90454f85a974e2e2c753594663bea50bf4f (diff) | |
| download | seaweedfs-8c73410a51441d7f9f1140a8996dd3eb1f191f2e.tar.xz seaweedfs-8c73410a51441d7f9f1140a8996dd3eb1f191f2e.zip | |
subscribe from a timestamp
Diffstat (limited to 'weed/messaging/client/subscriber.go')
| -rw-r--r-- | weed/messaging/client/subscriber.go | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go index 2ebad4ce6..53e7ffc7d 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/client/subscriber.go @@ -13,7 +13,7 @@ type Subscriber struct { subscriberId string } -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) { +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ PartitionCount: 4, @@ -21,7 +21,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i)) + client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) if err != nil { return nil, err } @@ -34,7 +34,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) }, nil } -func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { +func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) if err != nil { @@ -48,7 +48,7 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic Topic: topic, Partition: partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, - TimestampNs: time.Now().UnixNano(), + TimestampNs: startTime.UnixNano(), SubscriberId: subscriberId, }, }) |
