aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-14 23:22:43 -0700
committerchrislu <chris.lu@gmail.com>2024-05-14 23:22:43 -0700
commit1f20178dedb493412397167e6ad8a7ef5b5e4b90 (patch)
tree3766df82dfd8fa3396248b1110f876805d86edcc
parent972e9faaa2f6fac677b79bcec1ba17321c267b37 (diff)
downloadseaweedfs-1f20178dedb493412397167e6ad8a7ef5b5e4b90.tar.xz
seaweedfs-1f20178dedb493412397167e6ad8a7ef5b5e4b90.zip
subscriber receives partitions and dispatch to processors
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go30
-rw-r--r--weed/mq/client/sub_client/subscribe.go77
-rw-r--r--weed/mq/client/sub_client/subscriber.go23
-rw-r--r--weed/mq/topic/partition.go10
4 files changed, 104 insertions, 36 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index ef7841725..328968c89 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
- "sync"
"time"
)
@@ -69,8 +68,10 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
assignment := resp.GetAssignment()
if assignment != nil {
glog.V(0).Infof("subscriber %s receive assignment: %v", sub.ContentConfig.Topic, assignment)
+ for _, assignedPartition := range assignment.PartitionAssignments {
+ sub.brokerPartitionAssignmentChan <- assignedPartition
+ }
}
- sub.onEachAssignment(assignment)
}
return nil
@@ -84,31 +85,6 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
}
}
-func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCoordinatorResponse_Assignment) {
- if assignment == nil {
- return
- }
- // process each partition, with a concurrency limit
- var wg sync.WaitGroup
- semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
-
- for _, assigned := range assignment.PartitionAssignments {
- wg.Add(1)
- semaphore <- struct{}{}
- go func(assigned *mq_pb.BrokerPartitionAssignment) {
- defer wg.Done()
- defer func() { <-semaphore }()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
- err := sub.onEachPartition(assigned)
- if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
- }
- }(assigned)
- }
-
- wg.Wait()
-}
-
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
// connect to the partition broker
return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index df62ea674..b788faeb5 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,11 +1,88 @@
package sub_client
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "sync"
+ "time"
+)
+
+type ProcessorState struct {
+
+}
+
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
+
+ go sub.startProcessors()
+
// loop forever
sub.doKeepConnectedToSubCoordinator()
return nil
}
+
+func (sub *TopicSubscriber) startProcessors() {
+ // listen to the messages from the sub coordinator
+ // start one processor per partition
+ var wg sync.WaitGroup
+ semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
+
+ for assigned := range sub.brokerPartitionAssignmentChan {
+ wg.Add(1)
+ semaphore <- struct{}{}
+
+ topicPartition := topic.FromPbPartition(assigned.Partition)
+
+ // wait until no covering partition is still in progress
+ sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)
+
+ // start a processors
+ sub.activeProcessorsLock.Lock()
+ sub.activeProcessors[topicPartition] = &ProcessorState{}
+ sub.activeProcessorsLock.Unlock()
+
+ go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
+ defer func() {
+ sub.activeProcessorsLock.Lock()
+ delete(sub.activeProcessors, topicPartition)
+ sub.activeProcessorsLock.Unlock()
+
+ <-semaphore
+ wg.Done()
+ }()
+ glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ err := sub.onEachPartition(assigned)
+ if err != nil {
+ glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
+ } else {
+ glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ }
+ }(assigned, topicPartition)
+ }
+
+ wg.Wait()
+
+}
+
+func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) {
+ foundOverlapping := true
+ for foundOverlapping {
+ sub.activeProcessorsLock.Lock()
+ foundOverlapping = false
+ for partition, _ := range sub.activeProcessors {
+ if partition.Overlaps(topicPartition) {
+ foundOverlapping = true
+ break
+ }
+ }
+ sub.activeProcessorsLock.Unlock()
+ if foundOverlapping {
+ glog.V(0).Infof("subscriber %s/%s waiting for partition %+v to complete", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, topicPartition)
+ time.Sleep(1 * time.Second)
+ }
+ }
+}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 723520c1d..a84791f0a 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -4,6 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
+ "sync"
"time"
)
@@ -30,23 +31,27 @@ type OnCompletionFunc func()
type TopicSubscriber struct {
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
- ProcessorConfig *ProcessorConfiguration
- brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
- OnEachMessageFunc OnEachMessageFunc
+ ProcessorConfig *ProcessorConfiguration
+ brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment
+ OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string
waitForMoreMessage bool
alreadyProcessedTsNs int64
+ activeProcessors map[topic.Partition]*ProcessorState
+ activeProcessorsLock sync.Mutex
}
func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {
return &TopicSubscriber{
- SubscriberConfig: subscriber,
- ContentConfig: content,
- ProcessorConfig: &processor,
- bootstrapBrokers: bootstrapBrokers,
- waitForMoreMessage: true,
- alreadyProcessedTsNs: content.StartTime.UnixNano(),
+ SubscriberConfig: subscriber,
+ ContentConfig: content,
+ ProcessorConfig: &processor,
+ brokerPartitionAssignmentChan: make(chan *mq_pb.BrokerPartitionAssignment, 1024),
+ bootstrapBrokers: bootstrapBrokers,
+ waitForMoreMessage: true,
+ alreadyProcessedTsNs: content.StartTime.UnixNano(),
+ activeProcessors: make(map[topic.Partition]*ProcessorState),
}
}
diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go
index 45b55c43b..ba1accce1 100644
--- a/weed/mq/topic/partition.go
+++ b/weed/mq/topic/partition.go
@@ -71,3 +71,13 @@ func (partition Partition) ToPbPartition() *mq_pb.Partition {
UnixTimeNs: partition.UnixTimeNs,
}
}
+
+func (partition Partition) Overlaps(partition2 Partition) bool {
+ if partition.RangeStart >= partition2.RangeStop {
+ return false
+ }
+ if partition.RangeStop <= partition2.RangeStart {
+ return false
+ }
+ return true
+}