aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/client/sub_client/subscribe.go5
-rw-r--r--weed/mq/client/sub_client/subscriber.go12
-rw-r--r--weed/mq/coordinator/consumer_group.go92
-rw-r--r--weed/mq/coordinator/coordinator.go36
-rw-r--r--weed/mq/topic/partition.go17
5 files changed, 157 insertions, 5 deletions
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index bfde6a512..0803b2c79 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -11,9 +11,14 @@ import (
func (sub *TopicSubscriber) Subscribe() error {
util.RetryUntil("subscribe", func() error {
+ // ask balancer for brokers of the topic
if err := sub.doLookup(sub.bootstrapBroker); err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
+ // treat the first broker as the topic leader
+ // connect to the leader broker
+
+ // subscribe to the topic
if err := sub.doProcess(); err != nil {
return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index f744c6fa2..809673de1 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -6,11 +6,13 @@ import (
)
type SubscriberConfiguration struct {
- ClientId string
- GroupId string
- GroupInstanceId string
- BootstrapServers []string
- GrpcDialOption grpc.DialOption
+ ClientId string
+ GroupId string
+ GroupInstanceId string
+ GroupMinimumPeers int32
+ GroupMaximumPeers int32
+ BootstrapServers []string
+ GrpcDialOption grpc.DialOption
}
type ContentConfiguration struct {
diff --git a/weed/mq/coordinator/consumer_group.go b/weed/mq/coordinator/consumer_group.go
new file mode 100644
index 000000000..e3dec493c
--- /dev/null
+++ b/weed/mq/coordinator/consumer_group.go
@@ -0,0 +1,92 @@
+package coordinator
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "sync"
+)
+
+func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) {
+ cg.MinimumActiveInstances = min
+ cg.MaximumActiveInstances = max
+}
+
+func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance {
+ cgi := &ConsumerGroupInstance{
+ ClientId: clientId,
+ }
+ cg.ConsumerGroupInstances.Set(clientId, cgi)
+ return cgi
+}
+
+func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) {
+ cg.ConsumerGroupInstances.Remove(clientId)
+}
+
+func (cg *ConsumerGroup) CoordinateIfNeeded() {
+ emptyInstanceCount, activeInstanceCount := int32(0), int32(0)
+ for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
+ if cgi.Val.Partition == nil {
+ // this consumer group instance is not assigned a partition
+ // need to assign one
+ emptyInstanceCount++
+ } else {
+ activeInstanceCount++
+ }
+ }
+
+ var delta int32
+ if emptyInstanceCount > 0 {
+ if cg.MinimumActiveInstances <= 0 {
+ // need to assign more partitions
+ delta = emptyInstanceCount
+ } else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances {
+ // need to assign more partitions
+ delta = cg.MinimumActiveInstances - activeInstanceCount
+ }
+ }
+
+ if cg.MaximumActiveInstances > 0 {
+ if activeInstanceCount > cg.MaximumActiveInstances {
+ // need to remove some partitions
+ delta = cg.MaximumActiveInstances - activeInstanceCount
+ }
+ }
+ if delta == 0 {
+ return
+ }
+ cg.doCoordinate(activeInstanceCount + delta)
+}
+
+func (cg *ConsumerGroup) doCoordinate(target int32) {
+ // stop existing instances from processing
+ var wg sync.WaitGroup
+ for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
+ if cgi.Val.Partition != nil {
+ wg.Add(1)
+ go func(cgi *ConsumerGroupInstance) {
+ defer wg.Done()
+ // stop processing
+ // flush internal state
+ // wait for all messages to be processed
+ // close the connection
+ }(cgi.Val)
+ }
+ }
+ wg.Wait()
+
+ partitions := topic.SplitPartitions(target)
+
+ // assign partitions to new instances
+ i := 0
+ for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
+ cgi.Val.Partition = partitions[i]
+ i++
+ wg.Add(1)
+ go func(cgi *ConsumerGroupInstance) {
+ defer wg.Done()
+ // start processing
+ // start consuming from the last offset
+ }(cgi.Val)
+ }
+ wg.Wait()
+}
diff --git a/weed/mq/coordinator/coordinator.go b/weed/mq/coordinator/coordinator.go
new file mode 100644
index 000000000..e94ac3371
--- /dev/null
+++ b/weed/mq/coordinator/coordinator.go
@@ -0,0 +1,36 @@
+package coordinator
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+)
+
+type ConsumerGroupInstance struct {
+ ClientId string
+ // the consumer group instance may not have an active partition
+ Partition *topic.Partition
+ // processed message count
+ ProcessedMessageCount int64
+}
+type ConsumerGroup struct {
+ // map a client id to a consumer group instance
+ ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
+ MinimumActiveInstances int32
+ MaximumActiveInstances int32
+}
+type TopicConsumerGroups struct {
+ // map a consumer group name to a consumer group
+ ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
+}
+
+// Coordinator coordinates the instances in the consumer group for one topic.
+// It is responsible for:
+// 1. Assigning partitions to consumer instances.
+// 2. Reassigning partitions when a consumer instance is down.
+// 3. Reassigning partitions when a consumer instance is up.
+type Coordinator struct {
+ // map client id to subscriber
+ Subscribers cmap.ConcurrentMap[string, *ConsumerGroupInstance]
+ // map topic name to consumer groups
+ TopicSubscribers cmap.ConcurrentMap[string, map[string]TopicConsumerGroups]
+}
diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go
index 285bdcb36..79c830f13 100644
--- a/weed/mq/topic/partition.go
+++ b/weed/mq/topic/partition.go
@@ -30,3 +30,20 @@ func FromPbPartition(partition *mq_pb.Partition) Partition {
RingSize: partition.RingSize,
}
}
+
+func SplitPartitions(targetCount int32) []*Partition {
+ partitions := make([]*Partition, 0, targetCount)
+ partitionSize := PartitionCount / targetCount
+ for i := int32(0); i < targetCount; i++ {
+ partitionStop := (i + 1) * partitionSize
+ if i == targetCount-1 {
+ partitionStop = PartitionCount
+ }
+ partitions = append(partitions, &Partition{
+ RangeStart: i * partitionSize,
+ RangeStop: partitionStop,
+ RingSize: PartitionCount,
+ })
+ }
+ return partitions
+}