aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/client/subscriber.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-30 02:19:51 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-30 02:19:51 -0700
commit8c73410a51441d7f9f1140a8996dd3eb1f191f2e (patch)
tree87e7a9cca2509abf25004b3150c52399b20a2ec5 /weed/messaging/client/subscriber.go
parent4e16a90454f85a974e2e2c753594663bea50bf4f (diff)
downloadseaweedfs-8c73410a51441d7f9f1140a8996dd3eb1f191f2e.tar.xz
seaweedfs-8c73410a51441d7f9f1140a8996dd3eb1f191f2e.zip
subscribe from a timestamp
Diffstat (limited to 'weed/messaging/client/subscriber.go')
-rw-r--r--weed/messaging/client/subscriber.go8
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go
index 2ebad4ce6..53e7ffc7d 100644
--- a/weed/messaging/client/subscriber.go
+++ b/weed/messaging/client/subscriber.go
@@ -13,7 +13,7 @@ type Subscriber struct {
subscriberId string
}
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) {
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
// read topic configuration
topicConfiguration := &messaging_pb.TopicConfiguration{
PartitionCount: 4,
@@ -21,7 +21,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string)
subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i))
+ client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
if err != nil {
return nil, err
}
@@ -34,7 +34,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string)
}, nil
}
-func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
+func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
if err != nil {
@@ -48,7 +48,7 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic
Topic: topic,
Partition: partition,
StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
- TimestampNs: time.Now().UnixNano(),
+ TimestampNs: startTime.UnixNano(),
SubscriberId: subscriberId,
},
})