aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscriber.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/subscriber.go')
-rw-r--r--weed/mq/client/sub_client/subscriber.go20
1 files changed, 13 insertions, 7 deletions
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 809673de1..9b96b14cb 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -3,6 +3,7 @@ package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
+ "time"
)
type SubscriberConfiguration struct {
@@ -19,6 +20,7 @@ type ContentConfiguration struct {
Namespace string
Topic string
Filter string
+ StartTime time.Time
}
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
@@ -30,14 +32,18 @@ type TopicSubscriber struct {
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
- bootstrapBroker string
+ bootstrapBrokers []string
+ waitForMoreMessage bool
+ alreadyProcessedTsNs int64
}
-func NewTopicSubscriber(bootstrapBroker string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
+func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
- SubscriberConfig: subscriber,
- ContentConfig: content,
- bootstrapBroker: bootstrapBroker,
+ SubscriberConfig: subscriber,
+ ContentConfig: content,
+ bootstrapBrokers: bootstrapBrokers,
+ waitForMoreMessage: true,
+ alreadyProcessedTsNs: content.StartTime.UnixNano(),
}
}
@@ -45,6 +51,6 @@ func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc
sub.OnEachMessageFunc = onEachMessageFn
}
-func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
- sub.OnCompletionFunc = onCompeletionFn
+func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {
+ sub.OnCompletionFunc = onCompletionFn
}