diff options
Diffstat (limited to 'weed/mq/client/sub_client/subscriber.go')
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 9b96b14cb..982c3f13b 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -1,34 +1,39 @@ package sub_client import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" "time" ) type SubscriberConfiguration struct { - ClientId string - GroupId string - GroupInstanceId string - GroupMinimumPeers int32 - GroupMaximumPeers int32 - BootstrapServers []string - GrpcDialOption grpc.DialOption + ClientId string + ConsumerGroup string + ConsumerGroupInstanceId string + GroupMinimumPeers int32 + GroupMaximumPeers int32 + BootstrapServers []string + GrpcDialOption grpc.DialOption } type ContentConfiguration struct { - Namespace string - Topic string + Topic topic.Topic Filter string StartTime time.Time } -type OnEachMessageFunc func(key, value []byte) (shouldContinue bool) +type ProcessorConfiguration struct { + ConcurrentPartitionLimit int // how many partitions to process concurrently +} + +type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error) type OnCompletionFunc func() type TopicSubscriber struct { SubscriberConfig *SubscriberConfiguration ContentConfig *ContentConfiguration + ProcessorConfig *ProcessorConfiguration brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment OnEachMessageFunc OnEachMessageFunc OnCompletionFunc OnCompletionFunc @@ -37,10 +42,11 @@ type TopicSubscriber struct { alreadyProcessedTsNs int64 } -func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { +func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { return &TopicSubscriber{ SubscriberConfig: subscriber, ContentConfig: content, + ProcessorConfig: &processor, bootstrapBrokers: bootstrapBrokers, waitForMoreMessage: true, alreadyProcessedTsNs: content.StartTime.UnixNano(), |
