diff options
Diffstat (limited to 'weed/mq/pub_balancer')
| -rw-r--r-- | weed/mq/pub_balancer/allocate.go | 54 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/allocate_test.go | 61 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balance.go | 73 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balance_action.go | 58 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balance_action_split.go | 43 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balance_brokers.go | 52 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balance_brokers_test.go | 75 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balancer.go | 83 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/broker_stats.go | 72 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/lookup.go | 53 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/partition_list_broker.go | 50 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/repair.go | 127 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/repair_test.go | 97 |
13 files changed, 898 insertions, 0 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go new file mode 100644 index 000000000..9b2113162 --- /dev/null +++ b/weed/mq/pub_balancer/allocate.go @@ -0,0 +1,54 @@ +package pub_balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "math/rand" +) + +func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) { + // divide the ring into partitions + rangeSize := MaxPartitionCount / partitionCount + for i := int32(0); i < partitionCount; i++ { + assignment := &mq_pb.BrokerPartitionAssignment{ + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: int32(i * rangeSize), + RangeStop: int32((i + 1) * rangeSize), + }, + } + if i == partitionCount-1 { + assignment.Partition.RangeStop = MaxPartitionCount + } + assignments = append(assignments, assignment) + } + + // pick the brokers + pickedBrokers := pickBrokers(brokers, partitionCount) + + // assign the partitions to brokers + for i, assignment := range assignments { + assignment.LeaderBroker = pickedBrokers[i] + } + glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments) + return +} + +// for now: randomly pick brokers +// TODO pick brokers based on the broker stats +func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string { + candidates := make([]string, 0, brokers.Count()) + for brokerStatsItem := range brokers.IterBuffered() { + candidates = append(candidates, brokerStatsItem.Key) + } + pickedBrokers := make([]string, 0, count) + for i := int32(0); i < count; i++ { + p := rand.Int() % len(candidates) + if p < 0 { + p = -p + } + pickedBrokers = append(pickedBrokers, candidates[p]) + } + return pickedBrokers +} diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go new file mode 100644 index 000000000..298b9ebc1 --- /dev/null +++ b/weed/mq/pub_balancer/allocate_test.go @@ -0,0 +1,61 @@ +package pub_balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "reflect" + "testing" +) + +func Test_allocateOneBroker(t *testing.T) { + brokers := cmap.New[*BrokerStats]() + brokers.SetIfAbsent("localhost:17777", &BrokerStats{ + TopicPartitionCount: 0, + ConsumerCount: 0, + CpuUsagePercent: 0, + }) + + tests := []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment + }{ + { + name: "test only one broker", + args: args{ + brokers: brokers, + partitionCount: 1, + }, + wantAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + }, + }, + } + testThem(t, tests) +} + +type args struct { + brokers cmap.ConcurrentMap[string, *BrokerStats] + partitionCount int32 +} + +func testThem(t *testing.T, tests []struct { + name string + args args + wantAssignments []*mq_pb.BrokerPartitionAssignment +}) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) { + t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments) + } + }) + } +} diff --git a/weed/mq/pub_balancer/balance.go b/weed/mq/pub_balancer/balance.go new file mode 100644 index 000000000..87fc5739b --- /dev/null +++ b/weed/mq/pub_balancer/balance.go @@ -0,0 +1,73 @@ +package pub_balancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "google.golang.org/grpc" +) + +/* +* Assuming a topic has [x,y] number of partitions when publishing, and there are b number of brokers. +* and p is the number of partitions per topic. +* if the broker number b <= x, then p = x. +* if the broker number x < b < y, then x <= p <= b. +* if the broker number b >= y, x <= p <= y + +Balance topic partitions to brokers +=================================== + +When the goal is to make sure that low traffic partitions can be merged, (and p >= x, and after last rebalance interval): +1. Calculate the average load(throughput) of partitions per topic. +2. If any two neighboring partitions have a load that is less than the average load, merge them. +3. If min(b, y) < p, then merge two neighboring partitions that have the least combined load. + +When the goal is to make sure that high traffic partitions can be split, (and p < y and p < b, and after last rebalance interval): +1. Calculate the average number of partitions per broker. +2. If any partition has a load that is more than the average load, split it into two partitions. + +When the goal is to make sure that each broker has the same number of partitions: +1. Calculate the average number of partitions per broker. +2. For the brokers that have more than the average number of partitions, move the partitions to the brokers that have less than the average number of partitions. + +*/ + +type BalanceAction interface { +} +type BalanceActionMerge struct { + Before []topic.TopicPartition + After topic.TopicPartition +} +type BalanceActionSplit struct { + Before topic.TopicPartition + After []topic.TopicPartition +} + +type BalanceActionMove struct { + TopicPartition topic.TopicPartition + SourceBroker string + TargetBroker string +} + +type BalanceActionCreate struct { + TopicPartition topic.TopicPartition + TargetBroker string +} + +// BalancePublishers check the stats of all brokers, +// and balance the publishers to the brokers. +func (balancer *Balancer) BalancePublishers() []BalanceAction { + action := BalanceTopicPartitionOnBrokers(balancer.Brokers) + return []BalanceAction{action} +} + +func (balancer *Balancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) { + for _, action := range actions { + switch action.(type) { + case *BalanceActionMove: + err = balancer.ExecuteBalanceActionMove(action.(*BalanceActionMove), grpcDialOption) + } + if err != nil { + return err + } + } + return nil +} diff --git a/weed/mq/pub_balancer/balance_action.go b/weed/mq/pub_balancer/balance_action.go new file mode 100644 index 000000000..c29ec3469 --- /dev/null +++ b/weed/mq/pub_balancer/balance_action.go @@ -0,0 +1,58 @@ +package pub_balancer + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" +) + +// Balancer <= PublisherToPubBalancer() <= Broker <=> Publish() +// ExecuteBalanceActionMove from Balancer => AssignTopicPartitions() => Broker => Publish() + +func (balancer *Balancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error { + if _, found := balancer.Brokers.Get(move.SourceBroker); !found { + return fmt.Errorf("source broker %s not found", move.SourceBroker) + } + if _, found := balancer.Brokers.Get(move.TargetBroker); !found { + return fmt.Errorf("target broker %s not found", move.TargetBroker) + } + + err := pb.WithBrokerGrpcClient(false, move.TargetBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{ + Topic: move.TopicPartition.Topic.ToPbTopic(), + BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + Partition: move.TopicPartition.ToPbPartition(), + }, + }, + IsLeader: true, + IsDraining: false, + }) + return err + }) + if err != nil { + return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.TargetBroker, err) + } + + err = pb.WithBrokerGrpcClient(false, move.SourceBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{ + Topic: move.TopicPartition.Topic.ToPbTopic(), + BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{ + { + Partition: move.TopicPartition.ToPbPartition(), + }, + }, + IsLeader: true, + IsDraining: true, + }) + return err + }) + if err != nil { + return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.SourceBroker, err) + } + + return nil + +} diff --git a/weed/mq/pub_balancer/balance_action_split.go b/weed/mq/pub_balancer/balance_action_split.go new file mode 100644 index 000000000..6d317ffb9 --- /dev/null +++ b/weed/mq/pub_balancer/balance_action_split.go @@ -0,0 +1,43 @@ +package pub_balancer + +/* +Sequence of operations to ensure ordering + +Assuming Publisher P10 is publishing to Topic Partition TP10, and Subscriber S10 is subscribing to Topic TP10. +After splitting Topic TP10 into Topic Partition TP11 and Topic Partition TP21, +Publisher P11 is publishing to Topic Partition TP11, and Publisher P21 is publishing to Topic Partition TP21. +Subscriber S12 is subscribing to Topic Partition TP11, and Subscriber S21 is subscribing to Topic Partition TP21. + +(The last digit is ephoch generation number, which is increasing when the topic partitioning is changed.) + +The diagram is as follows: +P10 -> TP10 -> S10 + || + \/ +P11 -> TP11 -> S11 +P21 -> TP21 -> S21 + +The following is the sequence of events: +1. Create Topic Partition TP11 and TP21 +2. Close Publisher(s) P10 +3. Close Subscriber(s) S10 +4. Close Topic Partition TP10 +5. Start Publisher P11, P21 +6. Start Subscriber S11, S21 + +The dependency is as follows: + 2 => 3 => 4 + | | + v v + 1 => (5 | 6) + +And also: +2 => 5 +3 => 6 + +For brokers: +1. Close all publishers for a topic partition +2. Close all subscribers for a topic partition +3. Close the topic partition + +*/ diff --git a/weed/mq/pub_balancer/balance_brokers.go b/weed/mq/pub_balancer/balance_brokers.go new file mode 100644 index 000000000..a6b25b7ca --- /dev/null +++ b/weed/mq/pub_balancer/balance_brokers.go @@ -0,0 +1,52 @@ +package pub_balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "math/rand" +) + +func BalanceTopicPartitionOnBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats]) BalanceAction { + // 1. calculate the average number of partitions per broker + var totalPartitionCount int32 + var totalBrokerCount int32 + for brokerStats := range brokers.IterBuffered() { + totalBrokerCount++ + totalPartitionCount += brokerStats.Val.TopicPartitionCount + } + averagePartitionCountPerBroker := totalPartitionCount / totalBrokerCount + minPartitionCountPerBroker := averagePartitionCountPerBroker + maxPartitionCountPerBroker := averagePartitionCountPerBroker + var sourceBroker, targetBroker string + var candidatePartition *topic.TopicPartition + for brokerStats := range brokers.IterBuffered() { + if minPartitionCountPerBroker > brokerStats.Val.TopicPartitionCount { + minPartitionCountPerBroker = brokerStats.Val.TopicPartitionCount + targetBroker = brokerStats.Key + } + if maxPartitionCountPerBroker < brokerStats.Val.TopicPartitionCount { + maxPartitionCountPerBroker = brokerStats.Val.TopicPartitionCount + sourceBroker = brokerStats.Key + // select a random partition from the source broker + randomePartitionIndex := rand.Intn(int(brokerStats.Val.TopicPartitionCount)) + index := 0 + for topicPartitionStats := range brokerStats.Val.TopicPartitionStats.IterBuffered() { + if index == randomePartitionIndex { + candidatePartition = &topicPartitionStats.Val.TopicPartition + break + } else { + index++ + } + } + } + } + if minPartitionCountPerBroker >= maxPartitionCountPerBroker-1 { + return nil + } + // 2. move the partitions from the source broker to the target broker + return &BalanceActionMove{ + TopicPartition: *candidatePartition, + SourceBroker: sourceBroker, + TargetBroker: targetBroker, + } +} diff --git a/weed/mq/pub_balancer/balance_brokers_test.go b/weed/mq/pub_balancer/balance_brokers_test.go new file mode 100644 index 000000000..54667d154 --- /dev/null +++ b/weed/mq/pub_balancer/balance_brokers_test.go @@ -0,0 +1,75 @@ +package pub_balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "reflect" + "testing" +) + +func TestBalanceTopicPartitionOnBrokers(t *testing.T) { + + brokers := cmap.New[*BrokerStats]() + broker1Stats := &BrokerStats{ + TopicPartitionCount: 1, + ConsumerCount: 1, + CpuUsagePercent: 1, + TopicPartitionStats: cmap.New[*TopicPartitionStats](), + } + broker1Stats.TopicPartitionStats.Set("topic1:0", &TopicPartitionStats{ + TopicPartition: topic.TopicPartition{ + Topic: topic.Topic{Namespace: "topic1", Name: "topic1"}, + Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024}, + }, + ConsumerCount: 1, + IsLeader: true, + }) + broker2Stats := &BrokerStats{ + TopicPartitionCount: 2, + ConsumerCount: 1, + CpuUsagePercent: 1, + TopicPartitionStats: cmap.New[*TopicPartitionStats](), + } + broker2Stats.TopicPartitionStats.Set("topic1:1", &TopicPartitionStats{ + TopicPartition: topic.TopicPartition{ + Topic: topic.Topic{Namespace: "topic1", Name: "topic1"}, + Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024}, + }, + ConsumerCount: 1, + IsLeader: true, + }) + broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{ + TopicPartition: topic.TopicPartition{ + Topic: topic.Topic{Namespace: "topic2", Name: "topic2"}, + Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024}, + }, + ConsumerCount: 1, + IsLeader: true, + }) + brokers.Set("broker1", broker1Stats) + brokers.Set("broker2", broker2Stats) + + type args struct { + brokers cmap.ConcurrentMap[string, *BrokerStats] + } + tests := []struct { + name string + args args + want BalanceAction + }{ + { + name: "test", + args: args{ + brokers: brokers, + }, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := BalanceTopicPartitionOnBrokers(tt.args.brokers); !reflect.DeepEqual(got, tt.want) { + t.Errorf("BalanceTopicPartitionOnBrokers() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go new file mode 100644 index 000000000..988b971af --- /dev/null +++ b/weed/mq/pub_balancer/balancer.go @@ -0,0 +1,83 @@ +package pub_balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +const ( + MaxPartitionCount = 8 * 9 * 5 * 7 //2520 + LockBrokerBalancer = "broker_balancer" +) + +// Balancer collects stats from all brokers. +// +// When publishers wants to create topics, it picks brokers to assign the topic partitions. +// When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions. +// +// When a partition needs to be split or merged, or a partition needs to be moved to another broker, +// the balancer will let the broker tell the consumer instance to stop processing the partition. +// The existing consumer instance will flush the internal state, and then stop processing. +// Then the balancer will tell the brokers to start sending new messages in the new/moved partition to the consumer instances. +// +// Failover to standby consumer instances: +// +// A consumer group can have min and max number of consumer instances. +// For consumer instances joined after the max number, they will be in standby mode. +// +// When a consumer instance is down, the broker will notice this and inform the balancer. +// The balancer will then tell the broker to send the partition to another standby consumer instance. +type Balancer struct { + Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address + // Collected from all brokers when they connect to the broker leader + TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name +} +func NewBalancer() *Balancer { + return &Balancer{ + Brokers: cmap.New[*BrokerStats](), + TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](), + } +} + +func (balancer *Balancer) OnBrokerConnected(broker string) (brokerStats *BrokerStats) { + var found bool + brokerStats, found = balancer.Brokers.Get(broker) + if !found { + brokerStats = NewBrokerStats() + if !balancer.Brokers.SetIfAbsent(broker, brokerStats) { + brokerStats, _ = balancer.Brokers.Get(broker) + } + } + return brokerStats +} + +func (balancer *Balancer) OnBrokerDisconnected(broker string, stats *BrokerStats) { + balancer.Brokers.Remove(broker) + + // update TopicToBrokers + for _, topic := range stats.Topics { + partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String()) + if !found { + continue + } + partitionSlotToBrokerList.RemoveBroker(broker) + } +} + +func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) { + brokerStats.UpdateStats(receivedStats) + + // update TopicToBrokers + for _, topicPartitionStats := range receivedStats.Stats { + topic := topicPartitionStats.Topic + partition := topicPartitionStats.Partition + partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String()) + if !found { + partitionSlotToBrokerList = NewPartitionSlotToBrokerList(MaxPartitionCount) + if !balancer.TopicToBrokers.SetIfAbsent(topic.String(), partitionSlotToBrokerList) { + partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topic.String()) + } + } + partitionSlotToBrokerList.AddBroker(partition, broker) + } +} diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go new file mode 100644 index 000000000..461e93c61 --- /dev/null +++ b/weed/mq/pub_balancer/broker_stats.go @@ -0,0 +1,72 @@ +package pub_balancer + +import ( + "fmt" + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +type BrokerStats struct { + TopicPartitionCount int32 + ConsumerCount int32 + CpuUsagePercent int32 + TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition + Topics []topic.Topic +} +type TopicPartitionStats struct { + topic.TopicPartition + ConsumerCount int32 + IsLeader bool +} + +func NewBrokerStats() *BrokerStats { + return &BrokerStats{ + TopicPartitionStats: cmap.New[*TopicPartitionStats](), + } +} +func (bs *BrokerStats) String() string { + return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}", + bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items()) +} + +func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { + bs.TopicPartitionCount = int32(len(stats.Stats)) + bs.CpuUsagePercent = stats.CpuUsagePercent + + var consumerCount int32 + currentTopicPartitions := bs.TopicPartitionStats.Items() + for _, topicPartitionStats := range stats.Stats { + tps := &TopicPartitionStats{ + TopicPartition: topic.TopicPartition{ + Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name}, + Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize}, + }, + ConsumerCount: topicPartitionStats.ConsumerCount, + IsLeader: topicPartitionStats.IsLeader, + } + consumerCount += topicPartitionStats.ConsumerCount + key := tps.TopicPartition.String() + bs.TopicPartitionStats.Set(key, tps) + delete(currentTopicPartitions, key) + } + // remove the topic partitions that are not in the stats + for key := range currentTopicPartitions { + bs.TopicPartitionStats.Remove(key) + } + bs.ConsumerCount = consumerCount + +} + +func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) { + tps := &TopicPartitionStats{ + TopicPartition: topic.TopicPartition{ + Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name}, + Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop}, + }, + ConsumerCount: 0, + IsLeader: true, + } + key := tps.TopicPartition.String() + bs.TopicPartitionStats.Set(key, tps) +} diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go new file mode 100644 index 000000000..3e103a650 --- /dev/null +++ b/weed/mq/pub_balancer/lookup.go @@ -0,0 +1,53 @@ +package pub_balancer + +import ( + "errors" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +var ( + ErrNoBroker = errors.New("no broker") +) + +func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { + if partitionCount == 0 { + partitionCount = 6 + } + // find existing topic partition assignments + for brokerStatsItem := range balancer.Brokers.IterBuffered() { + broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val + for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() { + topicPartitionStat := topicPartitionStatsItem.Val + if topicPartitionStat.TopicPartition.Namespace == topic.Namespace && + topicPartitionStat.TopicPartition.Name == topic.Name { + assignment := &mq_pb.BrokerPartitionAssignment{ + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: topicPartitionStat.RangeStart, + RangeStop: topicPartitionStat.RangeStop, + }, + } + // TODO fix follower setting + assignment.LeaderBroker = broker + assignments = append(assignments, assignment) + } + } + } + if len(assignments) > 0 && len(assignments) == int(partitionCount) || !publish { + glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments) + return assignments, nil + } + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + // t := topic.FromPbTopic(request.Topic) + if balancer.Brokers.IsEmpty() { + return nil, ErrNoBroker + } + return allocateTopicPartitions(balancer.Brokers, partitionCount), nil +} diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go new file mode 100644 index 000000000..7ceb2a9fc --- /dev/null +++ b/weed/mq/pub_balancer/partition_list_broker.go @@ -0,0 +1,50 @@ +package pub_balancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +type PartitionSlotToBroker struct { + RangeStart int32 + RangeStop int32 + AssignedBroker string +} + +type PartitionSlotToBrokerList struct { + PartitionSlots []*PartitionSlotToBroker + RingSize int32 +} + +func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList { + return &PartitionSlotToBrokerList{ + RingSize: ringSize, + } +} + +func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string) { + for _, partitionSlot := range ps.PartitionSlots { + if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop { + if partitionSlot.AssignedBroker == broker { + return + } + if partitionSlot.AssignedBroker != "" { + glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker) + } + partitionSlot.AssignedBroker = broker + return + } + } + ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{ + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + AssignedBroker: broker, + }) +} +func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) { + for _, partitionSlot := range ps.PartitionSlots { + if partitionSlot.AssignedBroker == broker { + partitionSlot.AssignedBroker = "" + } + } +} diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go new file mode 100644 index 000000000..0ab1a5ea9 --- /dev/null +++ b/weed/mq/pub_balancer/repair.go @@ -0,0 +1,127 @@ +package pub_balancer + +import ( + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "math/rand" + "modernc.org/mathutil" + "sort" +) + +func (balancer *Balancer) RepairTopics() []BalanceAction { + action := BalanceTopicPartitionOnBrokers(balancer.Brokers) + return []BalanceAction{action} +} + +type TopicPartitionInfo struct { + Leader string + Followers []string +} + +// RepairMissingTopicPartitions check the stats of all brokers, +// and repair the missing topic partitions on the brokers. +func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) { + + // find all topic partitions + topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo) + for brokerStatsItem := range brokers.IterBuffered() { + broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val + for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() { + topicPartitionStat := topicPartitionStatsItem.Val + topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic] + if !found { + topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo) + topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo + } + tpi, found := topicPartitionToInfo[topicPartitionStat.Partition] + if !found { + tpi = &TopicPartitionInfo{} + topicPartitionToInfo[topicPartitionStat.Partition] = tpi + } + if topicPartitionStat.IsLeader { + tpi.Leader = broker + } else { + tpi.Followers = append(tpi.Followers, broker) + } + } + } + + // collect all brokers as candidates + candidates := make([]string, 0, brokers.Count()) + for brokerStatsItem := range brokers.IterBuffered() { + candidates = append(candidates, brokerStatsItem.Key) + } + + // find the missing topic partitions + for t, topicPartitionToInfo := range topicToTopicPartitions { + missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo) + for _, partition := range missingPartitions { + actions = append(actions, BalanceActionCreate{ + TopicPartition: topic.TopicPartition{ + Topic: t, + Partition: partition, + }, + TargetBroker: candidates[rand.Intn(len(candidates))], + }) + } + } + + return actions +} + +func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) { + + // find the missing topic partitions + var partitions []topic.Partition + for partition := range info { + partitions = append(partitions, partition) + } + return findMissingPartitions(partitions, MaxPartitionCount) +} + +// findMissingPartitions find the missing partitions +func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) { + // sort the partitions by range start + sort.Slice(partitions, func(i, j int) bool { + return partitions[i].RangeStart < partitions[j].RangeStart + }) + + // calculate the average partition size + var covered int32 + for _, partition := range partitions { + covered += partition.RangeStop - partition.RangeStart + } + averagePartitionSize := covered / int32(len(partitions)) + + // find the missing partitions + var coveredWatermark int32 + i := 0 + for i < len(partitions) { + partition := partitions[i] + if partition.RangeStart > coveredWatermark { + upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart) + missingPartitions = append(missingPartitions, topic.Partition{ + RangeStart: coveredWatermark, + RangeStop: upperBound, + RingSize: ringSize, + }) + coveredWatermark = upperBound + if coveredWatermark == partition.RangeStop { + i++ + } + } else { + coveredWatermark = partition.RangeStop + i++ + } + } + for coveredWatermark < ringSize { + upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize) + missingPartitions = append(missingPartitions, topic.Partition{ + RangeStart: coveredWatermark, + RangeStop: upperBound, + RingSize: ringSize, + }) + coveredWatermark = upperBound + } + return missingPartitions +} diff --git a/weed/mq/pub_balancer/repair_test.go b/weed/mq/pub_balancer/repair_test.go new file mode 100644 index 000000000..08465c7e8 --- /dev/null +++ b/weed/mq/pub_balancer/repair_test.go @@ -0,0 +1,97 @@ +package pub_balancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "reflect" + "testing" +) + +func Test_findMissingPartitions(t *testing.T) { + type args struct { + partitions []topic.Partition + } + tests := []struct { + name string + args args + wantMissingPartitions []topic.Partition + }{ + { + name: "one partition", + args: args{ + partitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 0, RangeStop: 1024}, + }, + }, + wantMissingPartitions: nil, + }, + { + name: "two partitions", + args: args{ + partitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 0, RangeStop: 512}, + {RingSize: 1024, RangeStart: 512, RangeStop: 1024}, + }, + }, + wantMissingPartitions: nil, + }, + { + name: "four partitions, missing last two", + args: args{ + partitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 0, RangeStop: 256}, + {RingSize: 1024, RangeStart: 256, RangeStop: 512}, + }, + }, + wantMissingPartitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 512, RangeStop: 768}, + {RingSize: 1024, RangeStart: 768, RangeStop: 1024}, + }, + }, + { + name: "four partitions, missing first two", + args: args{ + partitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 512, RangeStop: 768}, + {RingSize: 1024, RangeStart: 768, RangeStop: 1024}, + }, + }, + wantMissingPartitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 0, RangeStop: 256}, + {RingSize: 1024, RangeStart: 256, RangeStop: 512}, + }, + }, + { + name: "four partitions, missing middle two", + args: args{ + partitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 0, RangeStop: 256}, + {RingSize: 1024, RangeStart: 768, RangeStop: 1024}, + }, + }, + wantMissingPartitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 256, RangeStop: 512}, + {RingSize: 1024, RangeStart: 512, RangeStop: 768}, + }, + }, + { + name: "four partitions, missing three", + args: args{ + partitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 512, RangeStop: 768}, + }, + }, + wantMissingPartitions: []topic.Partition{ + {RingSize: 1024, RangeStart: 0, RangeStop: 256}, + {RingSize: 1024, RangeStart: 256, RangeStop: 512}, + {RingSize: 1024, RangeStart: 768, RangeStop: 1024}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotMissingPartitions := findMissingPartitions(tt.args.partitions, 1024); !reflect.DeepEqual(gotMissingPartitions, tt.wantMissingPartitions) { + t.Errorf("findMissingPartitions() = %v, want %v", gotMissingPartitions, tt.wantMissingPartitions) + } + }) + } +} |
