diff options
| -rw-r--r-- | weed/mq/balancer/balancer.go | 12 | ||||
| -rw-r--r-- | weed/mq/balancer/lookup.go | 2 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_lookup.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 5 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 12 | ||||
| -rw-r--r-- | weed/mq/coordinator/consumer_group.go | 92 | ||||
| -rw-r--r-- | weed/mq/coordinator/coordinator.go | 36 | ||||
| -rw-r--r-- | weed/mq/topic/local_manager.go | 10 | ||||
| -rw-r--r-- | weed/mq/topic/partition.go | 17 | ||||
| -rw-r--r-- | weed/mq/topic/topic.go | 10 | ||||
| -rw-r--r-- | weed/mq/topic/topic_partition.go | 6 |
11 files changed, 175 insertions, 29 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 258b29f2f..d93cc8de8 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -49,10 +49,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ - Namespace: topicPartitionStats.Topic.Namespace, - Topic: topicPartitionStats.Topic.Name, - RangeStart: topicPartitionStats.Partition.RangeStart, - RangeStop: topicPartitionStats.Partition.RangeStop, + Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name}, + Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize}, }, ConsumerCount: topicPartitionStats.ConsumerCount, IsLeader: topicPartitionStats.IsLeader, @@ -73,10 +71,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) { tps := &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ - Namespace: t.Namespace, - Topic: t.Name, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, + Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, + Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop}, }, ConsumerCount: 0, IsLeader: true, diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go index 7362fbab7..d5b78fc45 100644 --- a/weed/mq/balancer/lookup.go +++ b/weed/mq/balancer/lookup.go @@ -16,7 +16,7 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() { topicPartitionStat := topicPartitionStatsItem.Val if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && - topicPartitionStat.TopicPartition.Topic == topic.Name { + topicPartitionStat.TopicPartition.Name == topic.Name { assignment := &mq_pb.BrokerPartitionAssignment{ Partition: &mq_pb.Partition{ RingSize: MaxPartitionCount, diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 93586e22d..74a3a9822 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb topicPartitionStat := topicPartitionStatsItem.Val topic := &mq_pb.Topic{ Namespace: topicPartitionStat.TopicPartition.Namespace, - Name: topicPartitionStat.TopicPartition.Topic, + Name: topicPartitionStat.TopicPartition.Name, } topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name) if _, found := knownTopics[topicKey]; found { diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index bfde6a512..0803b2c79 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -11,9 +11,14 @@ import ( func (sub *TopicSubscriber) Subscribe() error { util.RetryUntil("subscribe", func() error { + // ask balancer for brokers of the topic if err := sub.doLookup(sub.bootstrapBroker); err != nil { return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } + // treat the first broker as the topic leader + // connect to the leader broker + + // subscribe to the topic if err := sub.doProcess(); err != nil { return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) } diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index f744c6fa2..809673de1 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -6,11 +6,13 @@ import ( ) type SubscriberConfiguration struct { - ClientId string - GroupId string - GroupInstanceId string - BootstrapServers []string - GrpcDialOption grpc.DialOption + ClientId string + GroupId string + GroupInstanceId string + GroupMinimumPeers int32 + GroupMaximumPeers int32 + BootstrapServers []string + GrpcDialOption grpc.DialOption } type ContentConfiguration struct { diff --git a/weed/mq/coordinator/consumer_group.go b/weed/mq/coordinator/consumer_group.go new file mode 100644 index 000000000..e3dec493c --- /dev/null +++ b/weed/mq/coordinator/consumer_group.go @@ -0,0 +1,92 @@ +package coordinator + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "sync" +) + +func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) { + cg.MinimumActiveInstances = min + cg.MaximumActiveInstances = max +} + +func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance { + cgi := &ConsumerGroupInstance{ + ClientId: clientId, + } + cg.ConsumerGroupInstances.Set(clientId, cgi) + return cgi +} + +func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) { + cg.ConsumerGroupInstances.Remove(clientId) +} + +func (cg *ConsumerGroup) CoordinateIfNeeded() { + emptyInstanceCount, activeInstanceCount := int32(0), int32(0) + for cgi := range cg.ConsumerGroupInstances.IterBuffered() { + if cgi.Val.Partition == nil { + // this consumer group instance is not assigned a partition + // need to assign one + emptyInstanceCount++ + } else { + activeInstanceCount++ + } + } + + var delta int32 + if emptyInstanceCount > 0 { + if cg.MinimumActiveInstances <= 0 { + // need to assign more partitions + delta = emptyInstanceCount + } else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances { + // need to assign more partitions + delta = cg.MinimumActiveInstances - activeInstanceCount + } + } + + if cg.MaximumActiveInstances > 0 { + if activeInstanceCount > cg.MaximumActiveInstances { + // need to remove some partitions + delta = cg.MaximumActiveInstances - activeInstanceCount + } + } + if delta == 0 { + return + } + cg.doCoordinate(activeInstanceCount + delta) +} + +func (cg *ConsumerGroup) doCoordinate(target int32) { + // stop existing instances from processing + var wg sync.WaitGroup + for cgi := range cg.ConsumerGroupInstances.IterBuffered() { + if cgi.Val.Partition != nil { + wg.Add(1) + go func(cgi *ConsumerGroupInstance) { + defer wg.Done() + // stop processing + // flush internal state + // wait for all messages to be processed + // close the connection + }(cgi.Val) + } + } + wg.Wait() + + partitions := topic.SplitPartitions(target) + + // assign partitions to new instances + i := 0 + for cgi := range cg.ConsumerGroupInstances.IterBuffered() { + cgi.Val.Partition = partitions[i] + i++ + wg.Add(1) + go func(cgi *ConsumerGroupInstance) { + defer wg.Done() + // start processing + // start consuming from the last offset + }(cgi.Val) + } + wg.Wait() +} diff --git a/weed/mq/coordinator/coordinator.go b/weed/mq/coordinator/coordinator.go new file mode 100644 index 000000000..e94ac3371 --- /dev/null +++ b/weed/mq/coordinator/coordinator.go @@ -0,0 +1,36 @@ +package coordinator + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" +) + +type ConsumerGroupInstance struct { + ClientId string + // the consumer group instance may not have an active partition + Partition *topic.Partition + // processed message count + ProcessedMessageCount int64 +} +type ConsumerGroup struct { + // map a client id to a consumer group instance + ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] + MinimumActiveInstances int32 + MaximumActiveInstances int32 +} +type TopicConsumerGroups struct { + // map a consumer group name to a consumer group + ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup] +} + +// Coordinator coordinates the instances in the consumer group for one topic. +// It is responsible for: +// 1. Assigning partitions to consumer instances. +// 2. Reassigning partitions when a consumer instance is down. +// 3. Reassigning partitions when a consumer instance is up. +type Coordinator struct { + // map client id to subscriber + Subscribers cmap.ConcurrentMap[string, *ConsumerGroupInstance] + // map topic name to consumer groups + TopicSubscribers cmap.ConcurrentMap[string, map[string]TopicConsumerGroups] +} diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index e3ee46a1e..0c54f2bb1 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -75,10 +75,12 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { for _, localPartition := range localTopic.Partitions { topicPartition := &TopicPartition{ - Namespace: string(localTopic.Namespace), - Topic: localTopic.Name, - RangeStart: localPartition.RangeStart, - RangeStop: localPartition.RangeStop, + Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name}, + Partition: Partition{ + RingSize: localPartition.RingSize, + RangeStart: localPartition.RangeStart, + RangeStop: localPartition.RangeStop, + }, } stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{ Topic: &mq_pb.Topic{ diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go index 285bdcb36..79c830f13 100644 --- a/weed/mq/topic/partition.go +++ b/weed/mq/topic/partition.go @@ -30,3 +30,20 @@ func FromPbPartition(partition *mq_pb.Partition) Partition { RingSize: partition.RingSize, } } + +func SplitPartitions(targetCount int32) []*Partition { + partitions := make([]*Partition, 0, targetCount) + partitionSize := PartitionCount / targetCount + for i := int32(0); i < targetCount; i++ { + partitionStop := (i + 1) * partitionSize + if i == targetCount-1 { + partitionStop = PartitionCount + } + partitions = append(partitions, &Partition{ + RangeStart: i * partitionSize, + RangeStop: partitionStop, + RingSize: PartitionCount, + }) + } + return partitions +} diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go index 430999179..3d457e6f1 100644 --- a/weed/mq/topic/topic.go +++ b/weed/mq/topic/topic.go @@ -7,14 +7,12 @@ import ( "time" ) -type Namespace string - type Topic struct { - Namespace Namespace + Namespace string Name string } -func NewTopic(namespace Namespace, name string) Topic { +func NewTopic(namespace string, name string) Topic { return Topic{ Namespace: namespace, Name: name, @@ -22,7 +20,7 @@ func NewTopic(namespace Namespace, name string) Topic { } func FromPbTopic(topic *mq_pb.Topic) Topic { return Topic{ - Namespace: Namespace(topic.Namespace), + Namespace: topic.Namespace, Name: topic.Name, } } @@ -41,7 +39,7 @@ type Segment struct { func FromPbSegment(segment *mq_pb.Segment) *Segment { return &Segment{ Topic: Topic{ - Namespace: Namespace(segment.Namespace), + Namespace: segment.Namespace, Name: segment.Topic, }, Id: segment.Id, diff --git a/weed/mq/topic/topic_partition.go b/weed/mq/topic/topic_partition.go index 3d927b1d8..20b33a7e4 100644 --- a/weed/mq/topic/topic_partition.go +++ b/weed/mq/topic/topic_partition.go @@ -3,10 +3,8 @@ package topic import "fmt" type TopicPartition struct { - Namespace string - Topic string - RangeStart int32 - RangeStop int32 + Topic + Partition } func (tp *TopicPartition) String() string { |
