aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer/balancer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer/balancer.go')
-rw-r--r--weed/mq/pub_balancer/balancer.go10
1 files changed, 6 insertions, 4 deletions
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
index 5e8c8275e..0bcbdd51b 100644
--- a/weed/mq/pub_balancer/balancer.go
+++ b/weed/mq/pub_balancer/balancer.go
@@ -2,6 +2,7 @@ package pub_balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
@@ -31,6 +32,7 @@ type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
+ OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
}
func NewBalancer() *Balancer {
@@ -70,13 +72,13 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke
// update TopicToBrokers
for _, topicPartitionStats := range receivedStats.Stats {
- topic := topicPartitionStats.Topic
+ topicKey := topic.FromPbTopic(topicPartitionStats.Topic).String()
partition := topicPartitionStats.Partition
- partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String())
+ partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topicKey)
if !found {
partitionSlotToBrokerList = NewPartitionSlotToBrokerList(MaxPartitionCount)
- if !balancer.TopicToBrokers.SetIfAbsent(topic.String(), partitionSlotToBrokerList) {
- partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topic.String())
+ if !balancer.TopicToBrokers.SetIfAbsent(topicKey, partitionSlotToBrokerList) {
+ partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topicKey)
}
}
partitionSlotToBrokerList.AddBroker(partition, broker)