aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscriber.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/subscriber.go')
-rw-r--r--weed/mq/client/sub_client/subscriber.go46
1 files changed, 22 insertions, 24 deletions
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 982c3f13b..922593b77 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -4,6 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
+ "sync"
"time"
)
@@ -11,10 +12,9 @@ type SubscriberConfiguration struct {
ClientId string
ConsumerGroup string
ConsumerGroupInstanceId string
- GroupMinimumPeers int32
- GroupMaximumPeers int32
- BootstrapServers []string
GrpcDialOption grpc.DialOption
+ MaxPartitionCount int32 // how many partitions to process concurrently
+ PerPartitionConcurrency int32 // how many messages to process concurrently per partition
}
type ContentConfiguration struct {
@@ -23,33 +23,31 @@ type ContentConfiguration struct {
StartTime time.Time
}
-type ProcessorConfiguration struct {
- ConcurrentPartitionLimit int // how many partitions to process concurrently
-}
-
-type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error)
+type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func()
type TopicSubscriber struct {
- SubscriberConfig *SubscriberConfiguration
- ContentConfig *ContentConfiguration
- ProcessorConfig *ProcessorConfiguration
- brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
- OnEachMessageFunc OnEachMessageFunc
- OnCompletionFunc OnCompletionFunc
- bootstrapBrokers []string
- waitForMoreMessage bool
- alreadyProcessedTsNs int64
+ SubscriberConfig *SubscriberConfiguration
+ ContentConfig *ContentConfiguration
+ brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse
+ brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest
+ OnEachMessageFunc OnEachMessageFunc
+ OnCompletionFunc OnCompletionFunc
+ bootstrapBrokers []string
+ waitForMoreMessage bool
+ activeProcessors map[topic.Partition]*ProcessorState
+ activeProcessorsLock sync.Mutex
}
-func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {
+func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
- SubscriberConfig: subscriber,
- ContentConfig: content,
- ProcessorConfig: &processor,
- bootstrapBrokers: bootstrapBrokers,
- waitForMoreMessage: true,
- alreadyProcessedTsNs: content.StartTime.UnixNano(),
+ SubscriberConfig: subscriber,
+ ContentConfig: content,
+ brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024),
+ brokerPartitionAssignmentAckChan: make(chan *mq_pb.SubscriberToSubCoordinatorRequest, 1024),
+ bootstrapBrokers: bootstrapBrokers,
+ waitForMoreMessage: true,
+ activeProcessors: make(map[topic.Partition]*ProcessorState),
}
}