diff options
Diffstat (limited to 'weed/mq/pub_balancer/balancer.go')
| -rw-r--r-- | weed/mq/pub_balancer/balancer.go | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index 5e8c8275e..0bcbdd51b 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -2,6 +2,7 @@ package pub_balancer import ( cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -31,6 +32,7 @@ type Balancer struct { Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address // Collected from all brokers when they connect to the broker leader TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name + OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) } func NewBalancer() *Balancer { @@ -70,13 +72,13 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke // update TopicToBrokers for _, topicPartitionStats := range receivedStats.Stats { - topic := topicPartitionStats.Topic + topicKey := topic.FromPbTopic(topicPartitionStats.Topic).String() partition := topicPartitionStats.Partition - partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String()) + partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topicKey) if !found { partitionSlotToBrokerList = NewPartitionSlotToBrokerList(MaxPartitionCount) - if !balancer.TopicToBrokers.SetIfAbsent(topic.String(), partitionSlotToBrokerList) { - partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topic.String()) + if !balancer.TopicToBrokers.SetIfAbsent(topicKey, partitionSlotToBrokerList) { + partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topicKey) } } partitionSlotToBrokerList.AddBroker(partition, broker) |
