diff options
Diffstat (limited to 'weed/mq/pub_balancer')
| -rw-r--r-- | weed/mq/pub_balancer/balance_brokers_test.go | 6 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/broker_stats.go | 28 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/repair.go | 9 |
3 files changed, 17 insertions, 26 deletions
diff --git a/weed/mq/pub_balancer/balance_brokers_test.go b/weed/mq/pub_balancer/balance_brokers_test.go index 54667d154..122984f0d 100644 --- a/weed/mq/pub_balancer/balance_brokers_test.go +++ b/weed/mq/pub_balancer/balance_brokers_test.go @@ -21,8 +21,6 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) { Topic: topic.Topic{Namespace: "topic1", Name: "topic1"}, Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024}, }, - ConsumerCount: 1, - IsLeader: true, }) broker2Stats := &BrokerStats{ TopicPartitionCount: 2, @@ -35,16 +33,12 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) { Topic: topic.Topic{Namespace: "topic1", Name: "topic1"}, Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024}, }, - ConsumerCount: 1, - IsLeader: true, }) broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{ TopicPartition: topic.TopicPartition{ Topic: topic.Topic{Namespace: "topic2", Name: "topic2"}, Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024}, }, - ConsumerCount: 1, - IsLeader: true, }) brokers.Set("broker1", broker1Stats) brokers.Set("broker2", broker2Stats) diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index b4bb28e42..00f1f80ca 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -9,15 +9,16 @@ import ( type BrokerStats struct { TopicPartitionCount int32 - ConsumerCount int32 + PublisherCount int32 + SubscriberCount int32 CpuUsagePercent int32 TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition Topics []topic.Topic } type TopicPartitionStats struct { topic.TopicPartition - ConsumerCount int32 - IsLeader bool + PublisherCount int32 + SubscriberCount int32 } func NewBrokerStats() *BrokerStats { @@ -26,15 +27,15 @@ func NewBrokerStats() *BrokerStats { } } func (bs *BrokerStats) String() string { - return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}", - bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items()) + return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}", + bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items()) } func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { bs.TopicPartitionCount = int32(len(stats.Stats)) bs.CpuUsagePercent = stats.CpuUsagePercent - var consumerCount int32 + var publisherCount, subscriberCount int32 currentTopicPartitions := bs.TopicPartitionStats.Items() for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ @@ -47,10 +48,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs, }, }, - ConsumerCount: topicPartitionStats.ConsumerCount, - IsLeader: topicPartitionStats.IsLeader, + PublisherCount: topicPartitionStats.PublisherCount, + SubscriberCount: topicPartitionStats.SubscriberCount, } - consumerCount += topicPartitionStats.ConsumerCount + publisherCount += topicPartitionStats.PublisherCount + subscriberCount += topicPartitionStats.SubscriberCount key := tps.TopicPartition.String() bs.TopicPartitionStats.Set(key, tps) delete(currentTopicPartitions, key) @@ -59,8 +61,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { for key := range currentTopicPartitions { bs.TopicPartitionStats.Remove(key) } - bs.ConsumerCount = consumerCount - + bs.PublisherCount = publisherCount + bs.SubscriberCount = subscriberCount } func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) { @@ -74,8 +76,8 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti UnixTimeNs: partition.UnixTimeNs, }, }, - ConsumerCount: 0, - IsLeader: true, + PublisherCount: 0, + SubscriberCount: 0, } key := tps.TopicPartition.String() if isAdd { diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go index 0ab1a5ea9..0f307c9eb 100644 --- a/weed/mq/pub_balancer/repair.go +++ b/weed/mq/pub_balancer/repair.go @@ -14,8 +14,7 @@ func (balancer *Balancer) RepairTopics() []BalanceAction { } type TopicPartitionInfo struct { - Leader string - Followers []string + Broker string } // RepairMissingTopicPartitions check the stats of all brokers, @@ -38,11 +37,7 @@ func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStat tpi = &TopicPartitionInfo{} topicPartitionToInfo[topicPartitionStat.Partition] = tpi } - if topicPartitionStat.IsLeader { - tpi.Leader = broker - } else { - tpi.Followers = append(tpi.Followers, broker) - } + tpi.Broker = broker } } |
