aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client')
-rw-r--r--weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go21
-rw-r--r--weed/mq/client/cmd/weed_sub_record/subscriber_record.go22
-rw-r--r--weed/mq/client/pub_client/publish.go1
-rw-r--r--weed/mq/client/pub_client/scheduler.go10
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go118
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go125
-rw-r--r--weed/mq/client/sub_client/subscribe.go110
-rw-r--r--weed/mq/client/sub_client/subscriber.go46
8 files changed, 297 insertions, 156 deletions
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
index adcdda04c..902e7ed1b 100644
--- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
+++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
@@ -14,9 +14,11 @@ import (
)
var (
- namespace = flag.String("ns", "test", "namespace")
- t = flag.String("topic", "test", "topic")
- seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+ namespace = flag.String("ns", "test", "namespace")
+ t = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+ maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
+ perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
)
@@ -25,10 +27,11 @@ func main() {
flag.Parse()
subscriberConfig := &sub_client.SubscriberConfiguration{
- ClientId: fmt.Sprintf("client-%d", *clientId),
ConsumerGroup: "test",
ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ MaxPartitionCount: int32(*maxPartitionCount),
+ PerPartitionConcurrency: int32(*perPartitionConcurrency),
}
contentConfig := &sub_client.ContentConfiguration{
@@ -37,18 +40,14 @@ func main() {
StartTime: time.Unix(1, 1),
}
- processorConfig := sub_client.ProcessorConfiguration{
- ConcurrentPartitionLimit: 3,
- }
-
brokers := strings.Split(*seedBrokers, ",")
- subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
+ subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig)
counter := 0
- subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) {
+ subscriber.SetEachMessageFunc(func(key, value []byte) error {
counter++
println(string(key), "=>", string(value), counter)
- return true, nil
+ return nil
})
subscriber.SetCompletionFunc(func() {
diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
index 53eb4f15b..674c881ba 100644
--- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
+++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@@ -16,9 +16,11 @@ import (
)
var (
- namespace = flag.String("ns", "test", "namespace")
- t = flag.String("topic", "test", "topic")
- seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+ namespace = flag.String("ns", "test", "namespace")
+ t = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+ maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
+ perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
)
@@ -51,10 +53,11 @@ func main() {
flag.Parse()
subscriberConfig := &sub_client.SubscriberConfiguration{
- ClientId: fmt.Sprintf("client-%d", *clientId),
ConsumerGroup: "test",
ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ MaxPartitionCount: int32(*maxPartitionCount),
+ PerPartitionConcurrency: int32(*perPartitionConcurrency),
}
contentConfig := &sub_client.ContentConfiguration{
@@ -63,20 +66,17 @@ func main() {
StartTime: time.Unix(1, 1),
}
- processorConfig := sub_client.ProcessorConfiguration{
- ConcurrentPartitionLimit: 3,
- }
-
brokers := strings.Split(*seedBrokers, ",")
- subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
+ subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig)
counter := 0
- subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) {
+ subscriber.SetEachMessageFunc(func(key, value []byte) error {
counter++
record := &schema_pb.RecordValue{}
proto.Unmarshal(value, record)
fmt.Printf("record: %v\n", record)
- return true, nil
+ time.Sleep(1300 * time.Millisecond)
+ return nil
})
subscriber.SetCompletionFunc(func() {
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index a25620de1..a85eec31f 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -51,6 +51,7 @@ func (p *TopicPublisher) FinishPublish() error {
TsNs: time.Now().UnixNano(),
Ctrl: &mq_pb.ControlMessage{
IsClose: true,
+ PublisherName: p.config.PublisherName,
},
})
}
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index 03377d653..df2270b2c 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -142,11 +142,11 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
if err = publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{
- Topic: p.config.Topic.ToPbTopic(),
- Partition: job.Partition,
- AckInterval: 128,
- FollowerBrokers: job.FollowerBrokers,
- PublisherName: p.config.PublisherName,
+ Topic: p.config.Topic.ToPbTopic(),
+ Partition: job.Partition,
+ AckInterval: 128,
+ FollowerBroker: job.FollowerBroker,
+ PublisherName: p.config.PublisherName,
},
},
}); err != nil {
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 2f1330b5e..d05ddb960 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -2,12 +2,9 @@ package sub_client
import (
"context"
- "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "io"
- "sync"
"time"
)
@@ -51,6 +48,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
+ MaxPartitionCount: sub.SubscriberConfig.MaxPartitionCount,
},
},
}); err != nil {
@@ -58,6 +56,16 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return err
}
+ go func() {
+ for reply := range sub.brokerPartitionAssignmentAckChan {
+ glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
+ if err := stream.Send(reply); err != nil {
+ glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
+ return
+ }
+ }
+ }()
+
// keep receiving messages from the sub coordinator
for {
resp, err := stream.Recv()
@@ -65,11 +73,8 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err)
return err
}
- assignment := resp.GetAssignment()
- if assignment != nil {
- glog.V(0).Infof("subscriber %s receive assignment: %v", sub.ContentConfig.Topic, assignment)
- }
- sub.onEachAssignment(assignment)
+ sub.brokerPartitionAssignmentChan <- resp
+ glog.V(0).Infof("Received assignment: %+v", resp)
}
return nil
@@ -82,100 +87,3 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
time.Sleep(waitTime)
}
}
-
-func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCoordinatorResponse_Assignment) {
- if assignment == nil {
- return
- }
- // process each partition, with a concurrency limit
- var wg sync.WaitGroup
- semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
-
- for _, assigned := range assignment.PartitionAssignments {
- wg.Add(1)
- semaphore <- struct{}{}
- go func(assigned *mq_pb.BrokerPartitionAssignment) {
- defer wg.Done()
- defer func() { <-semaphore }()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
- err := sub.onEachPartition(assigned)
- if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
- }
- }(assigned)
- }
-
- wg.Wait()
-}
-
-func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
- // connect to the partition broker
- return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Init{
- Init: &mq_pb.SubscribeMessageRequest_InitMessage{
- ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
- ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
- Topic: sub.ContentConfig.Topic.ToPbTopic(),
- PartitionOffset: &mq_pb.PartitionOffset{
- Partition: assigned.Partition,
- StartTsNs: sub.alreadyProcessedTsNs,
- StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
- },
- Filter: sub.ContentConfig.Filter,
- FollowerBrokers: assigned.FollowerBrokers,
- },
- },
- })
-
- if err != nil {
- return fmt.Errorf("create subscribe client: %v", err)
- }
-
- glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
-
- if sub.OnCompletionFunc != nil {
- defer sub.OnCompletionFunc()
- }
- defer func() {
- subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Ack{
- Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Sequence: 0,
- },
- },
- })
- subscribeClient.CloseSend()
- }()
-
- for {
- // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
- resp, err := subscribeClient.Recv()
- if err != nil {
- return fmt.Errorf("subscribe recv: %v", err)
- }
- if resp.Message == nil {
- glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
- continue
- }
- switch m := resp.Message.(type) {
- case *mq_pb.SubscribeMessageResponse_Data:
- shouldContinue, processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
- if processErr != nil {
- return fmt.Errorf("process error: %v", processErr)
- }
- sub.alreadyProcessedTsNs = m.Data.TsNs
- if !shouldContinue {
- return nil
- }
- case *mq_pb.SubscribeMessageResponse_Ctrl:
- // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
- if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
- return io.EOF
- }
- }
- }
-
- return nil
- })
-}
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
new file mode 100644
index 000000000..5dcac4eb3
--- /dev/null
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -0,0 +1,125 @@
+package sub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "io"
+)
+
+func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
+ // connect to the partition broker
+ return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+
+ subscribeClient, err := client.SubscribeMessage(context.Background())
+ if err != nil {
+ return fmt.Errorf("create subscribe client: %v", err)
+ }
+
+ perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
+ if perPartitionConcurrency <= 0 {
+ perPartitionConcurrency = 1
+ }
+
+ if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Init{
+ Init: &mq_pb.SubscribeMessageRequest_InitMessage{
+ ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
+ ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
+ Topic: sub.ContentConfig.Topic.ToPbTopic(),
+ PartitionOffset: &mq_pb.PartitionOffset{
+ Partition: assigned.Partition,
+ StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
+ },
+ Filter: sub.ContentConfig.Filter,
+ FollowerBroker: assigned.FollowerBroker,
+ Concurrency: perPartitionConcurrency,
+ },
+ },
+ }); err != nil {
+ glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
+ }
+
+ glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
+
+ if sub.OnCompletionFunc != nil {
+ defer sub.OnCompletionFunc()
+ }
+
+ type KeyedOffset struct {
+ Key []byte
+ Offset int64
+ }
+
+ partitionOffsetChan := make(chan KeyedOffset, 1024)
+ defer func() {
+ close(partitionOffsetChan)
+ }()
+ executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ subscribeClient.CloseSend()
+ return
+ case ack, ok := <-partitionOffsetChan:
+ if !ok {
+ subscribeClient.CloseSend()
+ return
+ }
+ subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Ack{
+ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
+ Key: ack.Key,
+ Sequence: ack.Offset,
+ },
+ },
+ })
+ }
+ }
+ }()
+
+ var lastErr error
+
+ for lastErr == nil {
+ // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ resp, err := subscribeClient.Recv()
+ if err != nil {
+ return fmt.Errorf("subscribe recv: %v", err)
+ }
+ if resp.Message == nil {
+ glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ continue
+ }
+ switch m := resp.Message.(type) {
+ case *mq_pb.SubscribeMessageResponse_Data:
+ if m.Data.Ctrl != nil {
+ glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.IsClose)
+ continue
+ }
+ executors.Execute(func() {
+ processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
+ if processErr == nil {
+ partitionOffsetChan <- KeyedOffset{
+ Key: m.Data.Key,
+ Offset: m.Data.TsNs,
+ }
+ } else {
+ lastErr = processErr
+ }
+ })
+ case *mq_pb.SubscribeMessageResponse_Ctrl:
+ // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
+ if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
+ return io.EOF
+ }
+ }
+ }
+
+ return lastErr
+ })
+}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index df62ea674..5669bb348 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,11 +1,121 @@
package sub_client
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "sync"
+ "time"
+)
+
+type ProcessorState struct {
+ stopCh chan struct{}
+}
+
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
+
+ go sub.startProcessors()
+
// loop forever
sub.doKeepConnectedToSubCoordinator()
return nil
}
+
+func (sub *TopicSubscriber) startProcessors() {
+ // listen to the messages from the sub coordinator
+ // start one processor per partition
+ var wg sync.WaitGroup
+ semaphore := make(chan struct{}, sub.SubscriberConfig.MaxPartitionCount)
+
+ for message := range sub.brokerPartitionAssignmentChan {
+ if assigned := message.GetAssignment(); assigned != nil {
+ wg.Add(1)
+ semaphore <- struct{}{}
+
+ topicPartition := topic.FromPbPartition(assigned.PartitionAssignment.Partition)
+
+ // wait until no covering partition is still in progress
+ sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)
+
+ // start a processors
+ stopChan := make(chan struct{})
+ sub.activeProcessorsLock.Lock()
+ sub.activeProcessors[topicPartition] = &ProcessorState{
+ stopCh: stopChan,
+ }
+ sub.activeProcessorsLock.Unlock()
+
+ go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
+ defer func() {
+ sub.activeProcessorsLock.Lock()
+ delete(sub.activeProcessors, topicPartition)
+ sub.activeProcessorsLock.Unlock()
+
+ <-semaphore
+ wg.Done()
+ }()
+ glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
+ Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
+ AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
+ Partition: assigned.Partition,
+ },
+ },
+ }
+ err := sub.onEachPartition(assigned, stopChan)
+ if err != nil {
+ glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
+ } else {
+ glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ }
+ sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
+ Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{
+ AckUnAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage{
+ Partition: assigned.Partition,
+ },
+ },
+ }
+ }(assigned.PartitionAssignment, topicPartition)
+ }
+ if unAssignment := message.GetUnAssignment(); unAssignment != nil {
+ topicPartition := topic.FromPbPartition(unAssignment.Partition)
+ sub.activeProcessorsLock.Lock()
+ if processor, found := sub.activeProcessors[topicPartition]; found {
+ close(processor.stopCh)
+ delete(sub.activeProcessors, topicPartition)
+ }
+ sub.activeProcessorsLock.Unlock()
+ }
+ }
+
+ wg.Wait()
+
+}
+
+func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) {
+ foundOverlapping := true
+ for foundOverlapping {
+ sub.activeProcessorsLock.Lock()
+ foundOverlapping = false
+ var overlappedPartition topic.Partition
+ for partition, _ := range sub.activeProcessors {
+ if partition.Overlaps(topicPartition) {
+ if partition.Equals(topicPartition) {
+ continue
+ }
+ foundOverlapping = true
+ overlappedPartition = partition
+ break
+ }
+ }
+ sub.activeProcessorsLock.Unlock()
+ if foundOverlapping {
+ glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
+ time.Sleep(1 * time.Second)
+ }
+ }
+}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 982c3f13b..922593b77 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -4,6 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
+ "sync"
"time"
)
@@ -11,10 +12,9 @@ type SubscriberConfiguration struct {
ClientId string
ConsumerGroup string
ConsumerGroupInstanceId string
- GroupMinimumPeers int32
- GroupMaximumPeers int32
- BootstrapServers []string
GrpcDialOption grpc.DialOption
+ MaxPartitionCount int32 // how many partitions to process concurrently
+ PerPartitionConcurrency int32 // how many messages to process concurrently per partition
}
type ContentConfiguration struct {
@@ -23,33 +23,31 @@ type ContentConfiguration struct {
StartTime time.Time
}
-type ProcessorConfiguration struct {
- ConcurrentPartitionLimit int // how many partitions to process concurrently
-}
-
-type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error)
+type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func()
type TopicSubscriber struct {
- SubscriberConfig *SubscriberConfiguration
- ContentConfig *ContentConfiguration
- ProcessorConfig *ProcessorConfiguration
- brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
- OnEachMessageFunc OnEachMessageFunc
- OnCompletionFunc OnCompletionFunc
- bootstrapBrokers []string
- waitForMoreMessage bool
- alreadyProcessedTsNs int64
+ SubscriberConfig *SubscriberConfiguration
+ ContentConfig *ContentConfiguration
+ brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse
+ brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest
+ OnEachMessageFunc OnEachMessageFunc
+ OnCompletionFunc OnCompletionFunc
+ bootstrapBrokers []string
+ waitForMoreMessage bool
+ activeProcessors map[topic.Partition]*ProcessorState
+ activeProcessorsLock sync.Mutex
}
-func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {
+func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
- SubscriberConfig: subscriber,
- ContentConfig: content,
- ProcessorConfig: &processor,
- bootstrapBrokers: bootstrapBrokers,
- waitForMoreMessage: true,
- alreadyProcessedTsNs: content.StartTime.UnixNano(),
+ SubscriberConfig: subscriber,
+ ContentConfig: content,
+ brokerPartitionAssignmentChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1024),
+ brokerPartitionAssignmentAckChan: make(chan *mq_pb.SubscriberToSubCoordinatorRequest, 1024),
+ bootstrapBrokers: bootstrapBrokers,
+ waitForMoreMessage: true,
+ activeProcessors: make(map[topic.Partition]*ProcessorState),
}
}