aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer')
-rw-r--r--weed/mq/pub_balancer/allocate.go54
-rw-r--r--weed/mq/pub_balancer/allocate_test.go61
-rw-r--r--weed/mq/pub_balancer/balance.go73
-rw-r--r--weed/mq/pub_balancer/balance_action.go58
-rw-r--r--weed/mq/pub_balancer/balance_action_split.go43
-rw-r--r--weed/mq/pub_balancer/balance_brokers.go52
-rw-r--r--weed/mq/pub_balancer/balance_brokers_test.go75
-rw-r--r--weed/mq/pub_balancer/balancer.go83
-rw-r--r--weed/mq/pub_balancer/broker_stats.go72
-rw-r--r--weed/mq/pub_balancer/lookup.go53
-rw-r--r--weed/mq/pub_balancer/partition_list_broker.go50
-rw-r--r--weed/mq/pub_balancer/repair.go127
-rw-r--r--weed/mq/pub_balancer/repair_test.go97
13 files changed, 898 insertions, 0 deletions
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
new file mode 100644
index 000000000..9b2113162
--- /dev/null
+++ b/weed/mq/pub_balancer/allocate.go
@@ -0,0 +1,54 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "math/rand"
+)
+
+func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) {
+ // divide the ring into partitions
+ rangeSize := MaxPartitionCount / partitionCount
+ for i := int32(0); i < partitionCount; i++ {
+ assignment := &mq_pb.BrokerPartitionAssignment{
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: int32(i * rangeSize),
+ RangeStop: int32((i + 1) * rangeSize),
+ },
+ }
+ if i == partitionCount-1 {
+ assignment.Partition.RangeStop = MaxPartitionCount
+ }
+ assignments = append(assignments, assignment)
+ }
+
+ // pick the brokers
+ pickedBrokers := pickBrokers(brokers, partitionCount)
+
+ // assign the partitions to brokers
+ for i, assignment := range assignments {
+ assignment.LeaderBroker = pickedBrokers[i]
+ }
+ glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
+ return
+}
+
+// for now: randomly pick brokers
+// TODO pick brokers based on the broker stats
+func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string {
+ candidates := make([]string, 0, brokers.Count())
+ for brokerStatsItem := range brokers.IterBuffered() {
+ candidates = append(candidates, brokerStatsItem.Key)
+ }
+ pickedBrokers := make([]string, 0, count)
+ for i := int32(0); i < count; i++ {
+ p := rand.Int() % len(candidates)
+ if p < 0 {
+ p = -p
+ }
+ pickedBrokers = append(pickedBrokers, candidates[p])
+ }
+ return pickedBrokers
+}
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
new file mode 100644
index 000000000..298b9ebc1
--- /dev/null
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -0,0 +1,61 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "reflect"
+ "testing"
+)
+
+func Test_allocateOneBroker(t *testing.T) {
+ brokers := cmap.New[*BrokerStats]()
+ brokers.SetIfAbsent("localhost:17777", &BrokerStats{
+ TopicPartitionCount: 0,
+ ConsumerCount: 0,
+ CpuUsagePercent: 0,
+ })
+
+ tests := []struct {
+ name string
+ args args
+ wantAssignments []*mq_pb.BrokerPartitionAssignment
+ }{
+ {
+ name: "test only one broker",
+ args: args{
+ brokers: brokers,
+ partitionCount: 1,
+ },
+ wantAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ LeaderBroker: "localhost:17777",
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: 0,
+ RangeStop: MaxPartitionCount,
+ },
+ },
+ },
+ },
+ }
+ testThem(t, tests)
+}
+
+type args struct {
+ brokers cmap.ConcurrentMap[string, *BrokerStats]
+ partitionCount int32
+}
+
+func testThem(t *testing.T, tests []struct {
+ name string
+ args args
+ wantAssignments []*mq_pb.BrokerPartitionAssignment
+}) {
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) {
+ t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments)
+ }
+ })
+ }
+}
diff --git a/weed/mq/pub_balancer/balance.go b/weed/mq/pub_balancer/balance.go
new file mode 100644
index 000000000..87fc5739b
--- /dev/null
+++ b/weed/mq/pub_balancer/balance.go
@@ -0,0 +1,73 @@
+package pub_balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "google.golang.org/grpc"
+)
+
+/*
+* Assuming a topic has [x,y] number of partitions when publishing, and there are b number of brokers.
+* and p is the number of partitions per topic.
+* if the broker number b <= x, then p = x.
+* if the broker number x < b < y, then x <= p <= b.
+* if the broker number b >= y, x <= p <= y
+
+Balance topic partitions to brokers
+===================================
+
+When the goal is to make sure that low traffic partitions can be merged, (and p >= x, and after last rebalance interval):
+1. Calculate the average load(throughput) of partitions per topic.
+2. If any two neighboring partitions have a load that is less than the average load, merge them.
+3. If min(b, y) < p, then merge two neighboring partitions that have the least combined load.
+
+When the goal is to make sure that high traffic partitions can be split, (and p < y and p < b, and after last rebalance interval):
+1. Calculate the average number of partitions per broker.
+2. If any partition has a load that is more than the average load, split it into two partitions.
+
+When the goal is to make sure that each broker has the same number of partitions:
+1. Calculate the average number of partitions per broker.
+2. For the brokers that have more than the average number of partitions, move the partitions to the brokers that have less than the average number of partitions.
+
+*/
+
+type BalanceAction interface {
+}
+type BalanceActionMerge struct {
+ Before []topic.TopicPartition
+ After topic.TopicPartition
+}
+type BalanceActionSplit struct {
+ Before topic.TopicPartition
+ After []topic.TopicPartition
+}
+
+type BalanceActionMove struct {
+ TopicPartition topic.TopicPartition
+ SourceBroker string
+ TargetBroker string
+}
+
+type BalanceActionCreate struct {
+ TopicPartition topic.TopicPartition
+ TargetBroker string
+}
+
+// BalancePublishers check the stats of all brokers,
+// and balance the publishers to the brokers.
+func (balancer *Balancer) BalancePublishers() []BalanceAction {
+ action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
+ return []BalanceAction{action}
+}
+
+func (balancer *Balancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
+ for _, action := range actions {
+ switch action.(type) {
+ case *BalanceActionMove:
+ err = balancer.ExecuteBalanceActionMove(action.(*BalanceActionMove), grpcDialOption)
+ }
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/weed/mq/pub_balancer/balance_action.go b/weed/mq/pub_balancer/balance_action.go
new file mode 100644
index 000000000..c29ec3469
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_action.go
@@ -0,0 +1,58 @@
+package pub_balancer
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc"
+)
+
+// Balancer <= PublisherToPubBalancer() <= Broker <=> Publish()
+// ExecuteBalanceActionMove from Balancer => AssignTopicPartitions() => Broker => Publish()
+
+func (balancer *Balancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error {
+ if _, found := balancer.Brokers.Get(move.SourceBroker); !found {
+ return fmt.Errorf("source broker %s not found", move.SourceBroker)
+ }
+ if _, found := balancer.Brokers.Get(move.TargetBroker); !found {
+ return fmt.Errorf("target broker %s not found", move.TargetBroker)
+ }
+
+ err := pb.WithBrokerGrpcClient(false, move.TargetBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{
+ Topic: move.TopicPartition.Topic.ToPbTopic(),
+ BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ Partition: move.TopicPartition.ToPbPartition(),
+ },
+ },
+ IsLeader: true,
+ IsDraining: false,
+ })
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.TargetBroker, err)
+ }
+
+ err = pb.WithBrokerGrpcClient(false, move.SourceBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{
+ Topic: move.TopicPartition.Topic.ToPbTopic(),
+ BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ Partition: move.TopicPartition.ToPbPartition(),
+ },
+ },
+ IsLeader: true,
+ IsDraining: true,
+ })
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.SourceBroker, err)
+ }
+
+ return nil
+
+}
diff --git a/weed/mq/pub_balancer/balance_action_split.go b/weed/mq/pub_balancer/balance_action_split.go
new file mode 100644
index 000000000..6d317ffb9
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_action_split.go
@@ -0,0 +1,43 @@
+package pub_balancer
+
+/*
+Sequence of operations to ensure ordering
+
+Assuming Publisher P10 is publishing to Topic Partition TP10, and Subscriber S10 is subscribing to Topic TP10.
+After splitting Topic TP10 into Topic Partition TP11 and Topic Partition TP21,
+Publisher P11 is publishing to Topic Partition TP11, and Publisher P21 is publishing to Topic Partition TP21.
+Subscriber S12 is subscribing to Topic Partition TP11, and Subscriber S21 is subscribing to Topic Partition TP21.
+
+(The last digit is ephoch generation number, which is increasing when the topic partitioning is changed.)
+
+The diagram is as follows:
+P10 -> TP10 -> S10
+ ||
+ \/
+P11 -> TP11 -> S11
+P21 -> TP21 -> S21
+
+The following is the sequence of events:
+1. Create Topic Partition TP11 and TP21
+2. Close Publisher(s) P10
+3. Close Subscriber(s) S10
+4. Close Topic Partition TP10
+5. Start Publisher P11, P21
+6. Start Subscriber S11, S21
+
+The dependency is as follows:
+ 2 => 3 => 4
+ | |
+ v v
+ 1 => (5 | 6)
+
+And also:
+2 => 5
+3 => 6
+
+For brokers:
+1. Close all publishers for a topic partition
+2. Close all subscribers for a topic partition
+3. Close the topic partition
+
+*/
diff --git a/weed/mq/pub_balancer/balance_brokers.go b/weed/mq/pub_balancer/balance_brokers.go
new file mode 100644
index 000000000..a6b25b7ca
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_brokers.go
@@ -0,0 +1,52 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "math/rand"
+)
+
+func BalanceTopicPartitionOnBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats]) BalanceAction {
+ // 1. calculate the average number of partitions per broker
+ var totalPartitionCount int32
+ var totalBrokerCount int32
+ for brokerStats := range brokers.IterBuffered() {
+ totalBrokerCount++
+ totalPartitionCount += brokerStats.Val.TopicPartitionCount
+ }
+ averagePartitionCountPerBroker := totalPartitionCount / totalBrokerCount
+ minPartitionCountPerBroker := averagePartitionCountPerBroker
+ maxPartitionCountPerBroker := averagePartitionCountPerBroker
+ var sourceBroker, targetBroker string
+ var candidatePartition *topic.TopicPartition
+ for brokerStats := range brokers.IterBuffered() {
+ if minPartitionCountPerBroker > brokerStats.Val.TopicPartitionCount {
+ minPartitionCountPerBroker = brokerStats.Val.TopicPartitionCount
+ targetBroker = brokerStats.Key
+ }
+ if maxPartitionCountPerBroker < brokerStats.Val.TopicPartitionCount {
+ maxPartitionCountPerBroker = brokerStats.Val.TopicPartitionCount
+ sourceBroker = brokerStats.Key
+ // select a random partition from the source broker
+ randomePartitionIndex := rand.Intn(int(brokerStats.Val.TopicPartitionCount))
+ index := 0
+ for topicPartitionStats := range brokerStats.Val.TopicPartitionStats.IterBuffered() {
+ if index == randomePartitionIndex {
+ candidatePartition = &topicPartitionStats.Val.TopicPartition
+ break
+ } else {
+ index++
+ }
+ }
+ }
+ }
+ if minPartitionCountPerBroker >= maxPartitionCountPerBroker-1 {
+ return nil
+ }
+ // 2. move the partitions from the source broker to the target broker
+ return &BalanceActionMove{
+ TopicPartition: *candidatePartition,
+ SourceBroker: sourceBroker,
+ TargetBroker: targetBroker,
+ }
+}
diff --git a/weed/mq/pub_balancer/balance_brokers_test.go b/weed/mq/pub_balancer/balance_brokers_test.go
new file mode 100644
index 000000000..54667d154
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_brokers_test.go
@@ -0,0 +1,75 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "reflect"
+ "testing"
+)
+
+func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
+
+ brokers := cmap.New[*BrokerStats]()
+ broker1Stats := &BrokerStats{
+ TopicPartitionCount: 1,
+ ConsumerCount: 1,
+ CpuUsagePercent: 1,
+ TopicPartitionStats: cmap.New[*TopicPartitionStats](),
+ }
+ broker1Stats.TopicPartitionStats.Set("topic1:0", &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
+ Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024},
+ },
+ ConsumerCount: 1,
+ IsLeader: true,
+ })
+ broker2Stats := &BrokerStats{
+ TopicPartitionCount: 2,
+ ConsumerCount: 1,
+ CpuUsagePercent: 1,
+ TopicPartitionStats: cmap.New[*TopicPartitionStats](),
+ }
+ broker2Stats.TopicPartitionStats.Set("topic1:1", &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
+ Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024},
+ },
+ ConsumerCount: 1,
+ IsLeader: true,
+ })
+ broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: "topic2", Name: "topic2"},
+ Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024},
+ },
+ ConsumerCount: 1,
+ IsLeader: true,
+ })
+ brokers.Set("broker1", broker1Stats)
+ brokers.Set("broker2", broker2Stats)
+
+ type args struct {
+ brokers cmap.ConcurrentMap[string, *BrokerStats]
+ }
+ tests := []struct {
+ name string
+ args args
+ want BalanceAction
+ }{
+ {
+ name: "test",
+ args: args{
+ brokers: brokers,
+ },
+ want: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := BalanceTopicPartitionOnBrokers(tt.args.brokers); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("BalanceTopicPartitionOnBrokers() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
new file mode 100644
index 000000000..988b971af
--- /dev/null
+++ b/weed/mq/pub_balancer/balancer.go
@@ -0,0 +1,83 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+const (
+ MaxPartitionCount = 8 * 9 * 5 * 7 //2520
+ LockBrokerBalancer = "broker_balancer"
+)
+
+// Balancer collects stats from all brokers.
+//
+// When publishers wants to create topics, it picks brokers to assign the topic partitions.
+// When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions.
+//
+// When a partition needs to be split or merged, or a partition needs to be moved to another broker,
+// the balancer will let the broker tell the consumer instance to stop processing the partition.
+// The existing consumer instance will flush the internal state, and then stop processing.
+// Then the balancer will tell the brokers to start sending new messages in the new/moved partition to the consumer instances.
+//
+// Failover to standby consumer instances:
+//
+// A consumer group can have min and max number of consumer instances.
+// For consumer instances joined after the max number, they will be in standby mode.
+//
+// When a consumer instance is down, the broker will notice this and inform the balancer.
+// The balancer will then tell the broker to send the partition to another standby consumer instance.
+type Balancer struct {
+ Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
+ // Collected from all brokers when they connect to the broker leader
+ TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
+}
+func NewBalancer() *Balancer {
+ return &Balancer{
+ Brokers: cmap.New[*BrokerStats](),
+ TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
+ }
+}
+
+func (balancer *Balancer) OnBrokerConnected(broker string) (brokerStats *BrokerStats) {
+ var found bool
+ brokerStats, found = balancer.Brokers.Get(broker)
+ if !found {
+ brokerStats = NewBrokerStats()
+ if !balancer.Brokers.SetIfAbsent(broker, brokerStats) {
+ brokerStats, _ = balancer.Brokers.Get(broker)
+ }
+ }
+ return brokerStats
+}
+
+func (balancer *Balancer) OnBrokerDisconnected(broker string, stats *BrokerStats) {
+ balancer.Brokers.Remove(broker)
+
+ // update TopicToBrokers
+ for _, topic := range stats.Topics {
+ partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String())
+ if !found {
+ continue
+ }
+ partitionSlotToBrokerList.RemoveBroker(broker)
+ }
+}
+
+func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
+ brokerStats.UpdateStats(receivedStats)
+
+ // update TopicToBrokers
+ for _, topicPartitionStats := range receivedStats.Stats {
+ topic := topicPartitionStats.Topic
+ partition := topicPartitionStats.Partition
+ partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String())
+ if !found {
+ partitionSlotToBrokerList = NewPartitionSlotToBrokerList(MaxPartitionCount)
+ if !balancer.TopicToBrokers.SetIfAbsent(topic.String(), partitionSlotToBrokerList) {
+ partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topic.String())
+ }
+ }
+ partitionSlotToBrokerList.AddBroker(partition, broker)
+ }
+}
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
new file mode 100644
index 000000000..461e93c61
--- /dev/null
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -0,0 +1,72 @@
+package pub_balancer
+
+import (
+ "fmt"
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+type BrokerStats struct {
+ TopicPartitionCount int32
+ ConsumerCount int32
+ CpuUsagePercent int32
+ TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
+ Topics []topic.Topic
+}
+type TopicPartitionStats struct {
+ topic.TopicPartition
+ ConsumerCount int32
+ IsLeader bool
+}
+
+func NewBrokerStats() *BrokerStats {
+ return &BrokerStats{
+ TopicPartitionStats: cmap.New[*TopicPartitionStats](),
+ }
+}
+func (bs *BrokerStats) String() string {
+ return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
+ bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
+}
+
+func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
+ bs.TopicPartitionCount = int32(len(stats.Stats))
+ bs.CpuUsagePercent = stats.CpuUsagePercent
+
+ var consumerCount int32
+ currentTopicPartitions := bs.TopicPartitionStats.Items()
+ for _, topicPartitionStats := range stats.Stats {
+ tps := &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
+ Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize},
+ },
+ ConsumerCount: topicPartitionStats.ConsumerCount,
+ IsLeader: topicPartitionStats.IsLeader,
+ }
+ consumerCount += topicPartitionStats.ConsumerCount
+ key := tps.TopicPartition.String()
+ bs.TopicPartitionStats.Set(key, tps)
+ delete(currentTopicPartitions, key)
+ }
+ // remove the topic partitions that are not in the stats
+ for key := range currentTopicPartitions {
+ bs.TopicPartitionStats.Remove(key)
+ }
+ bs.ConsumerCount = consumerCount
+
+}
+
+func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) {
+ tps := &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
+ Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop},
+ },
+ ConsumerCount: 0,
+ IsLeader: true,
+ }
+ key := tps.TopicPartition.String()
+ bs.TopicPartitionStats.Set(key, tps)
+}
diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go
new file mode 100644
index 000000000..3e103a650
--- /dev/null
+++ b/weed/mq/pub_balancer/lookup.go
@@ -0,0 +1,53 @@
+package pub_balancer
+
+import (
+ "errors"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+var (
+ ErrNoBroker = errors.New("no broker")
+)
+
+func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
+ if partitionCount == 0 {
+ partitionCount = 6
+ }
+ // find existing topic partition assignments
+ for brokerStatsItem := range balancer.Brokers.IterBuffered() {
+ broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
+ for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
+ topicPartitionStat := topicPartitionStatsItem.Val
+ if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
+ topicPartitionStat.TopicPartition.Name == topic.Name {
+ assignment := &mq_pb.BrokerPartitionAssignment{
+ Partition: &mq_pb.Partition{
+ RingSize: MaxPartitionCount,
+ RangeStart: topicPartitionStat.RangeStart,
+ RangeStop: topicPartitionStat.RangeStop,
+ },
+ }
+ // TODO fix follower setting
+ assignment.LeaderBroker = broker
+ assignments = append(assignments, assignment)
+ }
+ }
+ }
+ if len(assignments) > 0 && len(assignments) == int(partitionCount) || !publish {
+ glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments)
+ return assignments, nil
+ }
+
+ // find the topic partitions on the filer
+ // if the topic is not found
+ // if the request is_for_publish
+ // create the topic
+ // if the request is_for_subscribe
+ // return error not found
+ // t := topic.FromPbTopic(request.Topic)
+ if balancer.Brokers.IsEmpty() {
+ return nil, ErrNoBroker
+ }
+ return allocateTopicPartitions(balancer.Brokers, partitionCount), nil
+}
diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go
new file mode 100644
index 000000000..7ceb2a9fc
--- /dev/null
+++ b/weed/mq/pub_balancer/partition_list_broker.go
@@ -0,0 +1,50 @@
+package pub_balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+type PartitionSlotToBroker struct {
+ RangeStart int32
+ RangeStop int32
+ AssignedBroker string
+}
+
+type PartitionSlotToBrokerList struct {
+ PartitionSlots []*PartitionSlotToBroker
+ RingSize int32
+}
+
+func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList {
+ return &PartitionSlotToBrokerList{
+ RingSize: ringSize,
+ }
+}
+
+func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string) {
+ for _, partitionSlot := range ps.PartitionSlots {
+ if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
+ if partitionSlot.AssignedBroker == broker {
+ return
+ }
+ if partitionSlot.AssignedBroker != "" {
+ glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
+ }
+ partitionSlot.AssignedBroker = broker
+ return
+ }
+ }
+ ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ AssignedBroker: broker,
+ })
+}
+func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
+ for _, partitionSlot := range ps.PartitionSlots {
+ if partitionSlot.AssignedBroker == broker {
+ partitionSlot.AssignedBroker = ""
+ }
+ }
+}
diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go
new file mode 100644
index 000000000..0ab1a5ea9
--- /dev/null
+++ b/weed/mq/pub_balancer/repair.go
@@ -0,0 +1,127 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "math/rand"
+ "modernc.org/mathutil"
+ "sort"
+)
+
+func (balancer *Balancer) RepairTopics() []BalanceAction {
+ action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
+ return []BalanceAction{action}
+}
+
+type TopicPartitionInfo struct {
+ Leader string
+ Followers []string
+}
+
+// RepairMissingTopicPartitions check the stats of all brokers,
+// and repair the missing topic partitions on the brokers.
+func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) {
+
+ // find all topic partitions
+ topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo)
+ for brokerStatsItem := range brokers.IterBuffered() {
+ broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
+ for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
+ topicPartitionStat := topicPartitionStatsItem.Val
+ topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic]
+ if !found {
+ topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo)
+ topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo
+ }
+ tpi, found := topicPartitionToInfo[topicPartitionStat.Partition]
+ if !found {
+ tpi = &TopicPartitionInfo{}
+ topicPartitionToInfo[topicPartitionStat.Partition] = tpi
+ }
+ if topicPartitionStat.IsLeader {
+ tpi.Leader = broker
+ } else {
+ tpi.Followers = append(tpi.Followers, broker)
+ }
+ }
+ }
+
+ // collect all brokers as candidates
+ candidates := make([]string, 0, brokers.Count())
+ for brokerStatsItem := range brokers.IterBuffered() {
+ candidates = append(candidates, brokerStatsItem.Key)
+ }
+
+ // find the missing topic partitions
+ for t, topicPartitionToInfo := range topicToTopicPartitions {
+ missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo)
+ for _, partition := range missingPartitions {
+ actions = append(actions, BalanceActionCreate{
+ TopicPartition: topic.TopicPartition{
+ Topic: t,
+ Partition: partition,
+ },
+ TargetBroker: candidates[rand.Intn(len(candidates))],
+ })
+ }
+ }
+
+ return actions
+}
+
+func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) {
+
+ // find the missing topic partitions
+ var partitions []topic.Partition
+ for partition := range info {
+ partitions = append(partitions, partition)
+ }
+ return findMissingPartitions(partitions, MaxPartitionCount)
+}
+
+// findMissingPartitions find the missing partitions
+func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) {
+ // sort the partitions by range start
+ sort.Slice(partitions, func(i, j int) bool {
+ return partitions[i].RangeStart < partitions[j].RangeStart
+ })
+
+ // calculate the average partition size
+ var covered int32
+ for _, partition := range partitions {
+ covered += partition.RangeStop - partition.RangeStart
+ }
+ averagePartitionSize := covered / int32(len(partitions))
+
+ // find the missing partitions
+ var coveredWatermark int32
+ i := 0
+ for i < len(partitions) {
+ partition := partitions[i]
+ if partition.RangeStart > coveredWatermark {
+ upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart)
+ missingPartitions = append(missingPartitions, topic.Partition{
+ RangeStart: coveredWatermark,
+ RangeStop: upperBound,
+ RingSize: ringSize,
+ })
+ coveredWatermark = upperBound
+ if coveredWatermark == partition.RangeStop {
+ i++
+ }
+ } else {
+ coveredWatermark = partition.RangeStop
+ i++
+ }
+ }
+ for coveredWatermark < ringSize {
+ upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize)
+ missingPartitions = append(missingPartitions, topic.Partition{
+ RangeStart: coveredWatermark,
+ RangeStop: upperBound,
+ RingSize: ringSize,
+ })
+ coveredWatermark = upperBound
+ }
+ return missingPartitions
+}
diff --git a/weed/mq/pub_balancer/repair_test.go b/weed/mq/pub_balancer/repair_test.go
new file mode 100644
index 000000000..08465c7e8
--- /dev/null
+++ b/weed/mq/pub_balancer/repair_test.go
@@ -0,0 +1,97 @@
+package pub_balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "reflect"
+ "testing"
+)
+
+func Test_findMissingPartitions(t *testing.T) {
+ type args struct {
+ partitions []topic.Partition
+ }
+ tests := []struct {
+ name string
+ args args
+ wantMissingPartitions []topic.Partition
+ }{
+ {
+ name: "one partition",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: nil,
+ },
+ {
+ name: "two partitions",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 512},
+ {RingSize: 1024, RangeStart: 512, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: nil,
+ },
+ {
+ name: "four partitions, missing last two",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ {
+ name: "four partitions, missing first two",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ },
+ },
+ {
+ name: "four partitions, missing middle two",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ },
+ },
+ {
+ name: "four partitions, missing three",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotMissingPartitions := findMissingPartitions(tt.args.partitions, 1024); !reflect.DeepEqual(gotMissingPartitions, tt.wantMissingPartitions) {
+ t.Errorf("findMissingPartitions() = %v, want %v", gotMissingPartitions, tt.wantMissingPartitions)
+ }
+ })
+ }
+}