diff options
Diffstat (limited to 'weed/mq/client/sub_client/subscriber.go')
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 46 |
1 files changed, 22 insertions, 24 deletions
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 982c3f13b..922593b77 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -4,6 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" + "sync" "time" ) @@ -11,10 +12,9 @@ type SubscriberConfiguration struct { ClientId string ConsumerGroup string ConsumerGroupInstanceId string - GroupMinimumPeers int32 - GroupMaximumPeers int32 - BootstrapServers []string GrpcDialOption grpc.DialOption + MaxPartitionCount int32 // how many partitions to process concurrently + PerPartitionConcurrency int32 // how many messages to process concurrently per partition } type ContentConfiguration struct { @@ -23,33 +23,31 @@ type ContentConfiguration struct { StartTime time.Time } -type ProcessorConfiguration struct { - ConcurrentPartitionLimit int // how many partitions to process concurrently -} - -type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error) +type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { - SubscriberConfig *SubscriberConfiguration - ContentConfig *ContentConfiguration - ProcessorConfig *ProcessorConfiguration - brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment - OnEachMessageFunc OnEachMessageFunc - OnCompletionFunc OnCompletionFunc - bootstrapBrokers []string - waitForMoreMessage bool - alreadyProcessedTsNs int64 + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration + brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse + brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest + OnEachMessageFunc OnEachMessageFunc + OnCompletionFunc OnCompletionFunc + bootstrapBrokers []string + waitForMoreMessage bool + activeProcessors map[topic.Partition]*ProcessorState + activeProcessorsLock sync.Mutex } -func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { +func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { return &TopicSubscriber{ - SubscriberConfig: subscriber, - ContentConfig: content, - ProcessorConfig: &processor, - bootstrapBrokers: bootstrapBrokers, - waitForMoreMessage: true, - alreadyProcessedTsNs: content.StartTime.UnixNano(), + SubscriberConfig: subscriber, + ContentConfig: content, + brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024), + brokerPartitionAssignmentAckChan: make(chan *mq_pb.SubscriberToSubCoordinatorRequest, 1024), + bootstrapBrokers: bootstrapBrokers, + waitForMoreMessage: true, + activeProcessors: make(map[topic.Partition]*ProcessorState), } } |
