aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/balancer/balancer.go12
-rw-r--r--weed/mq/balancer/lookup.go2
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go2
-rw-r--r--weed/mq/client/sub_client/subscribe.go5
-rw-r--r--weed/mq/client/sub_client/subscriber.go12
-rw-r--r--weed/mq/coordinator/consumer_group.go92
-rw-r--r--weed/mq/coordinator/coordinator.go36
-rw-r--r--weed/mq/topic/local_manager.go10
-rw-r--r--weed/mq/topic/partition.go17
-rw-r--r--weed/mq/topic/topic.go10
-rw-r--r--weed/mq/topic/topic_partition.go6
11 files changed, 175 insertions, 29 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 258b29f2f..d93cc8de8 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -49,10 +49,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
- Namespace: topicPartitionStats.Topic.Namespace,
- Topic: topicPartitionStats.Topic.Name,
- RangeStart: topicPartitionStats.Partition.RangeStart,
- RangeStop: topicPartitionStats.Partition.RangeStop,
+ Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
+ Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize},
},
ConsumerCount: topicPartitionStats.ConsumerCount,
IsLeader: topicPartitionStats.IsLeader,
@@ -73,10 +71,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
- Namespace: t.Namespace,
- Topic: t.Name,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
+ Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
+ Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop},
},
ConsumerCount: 0,
IsLeader: true,
diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go
index 7362fbab7..d5b78fc45 100644
--- a/weed/mq/balancer/lookup.go
+++ b/weed/mq/balancer/lookup.go
@@ -16,7 +16,7 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b
for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
- topicPartitionStat.TopicPartition.Topic == topic.Name {
+ topicPartitionStat.TopicPartition.Name == topic.Name {
assignment := &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 93586e22d..74a3a9822 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb
topicPartitionStat := topicPartitionStatsItem.Val
topic := &mq_pb.Topic{
Namespace: topicPartitionStat.TopicPartition.Namespace,
- Name: topicPartitionStat.TopicPartition.Topic,
+ Name: topicPartitionStat.TopicPartition.Name,
}
topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name)
if _, found := knownTopics[topicKey]; found {
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index bfde6a512..0803b2c79 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -11,9 +11,14 @@ import (
func (sub *TopicSubscriber) Subscribe() error {
util.RetryUntil("subscribe", func() error {
+ // ask balancer for brokers of the topic
if err := sub.doLookup(sub.bootstrapBroker); err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
+ // treat the first broker as the topic leader
+ // connect to the leader broker
+
+ // subscribe to the topic
if err := sub.doProcess(); err != nil {
return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index f744c6fa2..809673de1 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -6,11 +6,13 @@ import (
)
type SubscriberConfiguration struct {
- ClientId string
- GroupId string
- GroupInstanceId string
- BootstrapServers []string
- GrpcDialOption grpc.DialOption
+ ClientId string
+ GroupId string
+ GroupInstanceId string
+ GroupMinimumPeers int32
+ GroupMaximumPeers int32
+ BootstrapServers []string
+ GrpcDialOption grpc.DialOption
}
type ContentConfiguration struct {
diff --git a/weed/mq/coordinator/consumer_group.go b/weed/mq/coordinator/consumer_group.go
new file mode 100644
index 000000000..e3dec493c
--- /dev/null
+++ b/weed/mq/coordinator/consumer_group.go
@@ -0,0 +1,92 @@
+package coordinator
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "sync"
+)
+
+func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) {
+ cg.MinimumActiveInstances = min
+ cg.MaximumActiveInstances = max
+}
+
+func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance {
+ cgi := &ConsumerGroupInstance{
+ ClientId: clientId,
+ }
+ cg.ConsumerGroupInstances.Set(clientId, cgi)
+ return cgi
+}
+
+func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) {
+ cg.ConsumerGroupInstances.Remove(clientId)
+}
+
+func (cg *ConsumerGroup) CoordinateIfNeeded() {
+ emptyInstanceCount, activeInstanceCount := int32(0), int32(0)
+ for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
+ if cgi.Val.Partition == nil {
+ // this consumer group instance is not assigned a partition
+ // need to assign one
+ emptyInstanceCount++
+ } else {
+ activeInstanceCount++
+ }
+ }
+
+ var delta int32
+ if emptyInstanceCount > 0 {
+ if cg.MinimumActiveInstances <= 0 {
+ // need to assign more partitions
+ delta = emptyInstanceCount
+ } else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances {
+ // need to assign more partitions
+ delta = cg.MinimumActiveInstances - activeInstanceCount
+ }
+ }
+
+ if cg.MaximumActiveInstances > 0 {
+ if activeInstanceCount > cg.MaximumActiveInstances {
+ // need to remove some partitions
+ delta = cg.MaximumActiveInstances - activeInstanceCount
+ }
+ }
+ if delta == 0 {
+ return
+ }
+ cg.doCoordinate(activeInstanceCount + delta)
+}
+
+func (cg *ConsumerGroup) doCoordinate(target int32) {
+ // stop existing instances from processing
+ var wg sync.WaitGroup
+ for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
+ if cgi.Val.Partition != nil {
+ wg.Add(1)
+ go func(cgi *ConsumerGroupInstance) {
+ defer wg.Done()
+ // stop processing
+ // flush internal state
+ // wait for all messages to be processed
+ // close the connection
+ }(cgi.Val)
+ }
+ }
+ wg.Wait()
+
+ partitions := topic.SplitPartitions(target)
+
+ // assign partitions to new instances
+ i := 0
+ for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
+ cgi.Val.Partition = partitions[i]
+ i++
+ wg.Add(1)
+ go func(cgi *ConsumerGroupInstance) {
+ defer wg.Done()
+ // start processing
+ // start consuming from the last offset
+ }(cgi.Val)
+ }
+ wg.Wait()
+}
diff --git a/weed/mq/coordinator/coordinator.go b/weed/mq/coordinator/coordinator.go
new file mode 100644
index 000000000..e94ac3371
--- /dev/null
+++ b/weed/mq/coordinator/coordinator.go
@@ -0,0 +1,36 @@
+package coordinator
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+)
+
+type ConsumerGroupInstance struct {
+ ClientId string
+ // the consumer group instance may not have an active partition
+ Partition *topic.Partition
+ // processed message count
+ ProcessedMessageCount int64
+}
+type ConsumerGroup struct {
+ // map a client id to a consumer group instance
+ ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
+ MinimumActiveInstances int32
+ MaximumActiveInstances int32
+}
+type TopicConsumerGroups struct {
+ // map a consumer group name to a consumer group
+ ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
+}
+
+// Coordinator coordinates the instances in the consumer group for one topic.
+// It is responsible for:
+// 1. Assigning partitions to consumer instances.
+// 2. Reassigning partitions when a consumer instance is down.
+// 3. Reassigning partitions when a consumer instance is up.
+type Coordinator struct {
+ // map client id to subscriber
+ Subscribers cmap.ConcurrentMap[string, *ConsumerGroupInstance]
+ // map topic name to consumer groups
+ TopicSubscribers cmap.ConcurrentMap[string, map[string]TopicConsumerGroups]
+}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index e3ee46a1e..0c54f2bb1 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -75,10 +75,12 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions {
topicPartition := &TopicPartition{
- Namespace: string(localTopic.Namespace),
- Topic: localTopic.Name,
- RangeStart: localPartition.RangeStart,
- RangeStop: localPartition.RangeStop,
+ Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
+ Partition: Partition{
+ RingSize: localPartition.RingSize,
+ RangeStart: localPartition.RangeStart,
+ RangeStop: localPartition.RangeStop,
+ },
}
stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{
Topic: &mq_pb.Topic{
diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go
index 285bdcb36..79c830f13 100644
--- a/weed/mq/topic/partition.go
+++ b/weed/mq/topic/partition.go
@@ -30,3 +30,20 @@ func FromPbPartition(partition *mq_pb.Partition) Partition {
RingSize: partition.RingSize,
}
}
+
+func SplitPartitions(targetCount int32) []*Partition {
+ partitions := make([]*Partition, 0, targetCount)
+ partitionSize := PartitionCount / targetCount
+ for i := int32(0); i < targetCount; i++ {
+ partitionStop := (i + 1) * partitionSize
+ if i == targetCount-1 {
+ partitionStop = PartitionCount
+ }
+ partitions = append(partitions, &Partition{
+ RangeStart: i * partitionSize,
+ RangeStop: partitionStop,
+ RingSize: PartitionCount,
+ })
+ }
+ return partitions
+}
diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go
index 430999179..3d457e6f1 100644
--- a/weed/mq/topic/topic.go
+++ b/weed/mq/topic/topic.go
@@ -7,14 +7,12 @@ import (
"time"
)
-type Namespace string
-
type Topic struct {
- Namespace Namespace
+ Namespace string
Name string
}
-func NewTopic(namespace Namespace, name string) Topic {
+func NewTopic(namespace string, name string) Topic {
return Topic{
Namespace: namespace,
Name: name,
@@ -22,7 +20,7 @@ func NewTopic(namespace Namespace, name string) Topic {
}
func FromPbTopic(topic *mq_pb.Topic) Topic {
return Topic{
- Namespace: Namespace(topic.Namespace),
+ Namespace: topic.Namespace,
Name: topic.Name,
}
}
@@ -41,7 +39,7 @@ type Segment struct {
func FromPbSegment(segment *mq_pb.Segment) *Segment {
return &Segment{
Topic: Topic{
- Namespace: Namespace(segment.Namespace),
+ Namespace: segment.Namespace,
Name: segment.Topic,
},
Id: segment.Id,
diff --git a/weed/mq/topic/topic_partition.go b/weed/mq/topic/topic_partition.go
index 3d927b1d8..20b33a7e4 100644
--- a/weed/mq/topic/topic_partition.go
+++ b/weed/mq/topic/topic_partition.go
@@ -3,10 +3,8 @@ package topic
import "fmt"
type TopicPartition struct {
- Namespace string
- Topic string
- RangeStart int32
- RangeStop int32
+ Topic
+ Partition
}
func (tp *TopicPartition) String() string {