aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer')
-rw-r--r--weed/mq/pub_balancer/balance_brokers_test.go6
-rw-r--r--weed/mq/pub_balancer/broker_stats.go28
-rw-r--r--weed/mq/pub_balancer/repair.go9
3 files changed, 17 insertions, 26 deletions
diff --git a/weed/mq/pub_balancer/balance_brokers_test.go b/weed/mq/pub_balancer/balance_brokers_test.go
index 54667d154..122984f0d 100644
--- a/weed/mq/pub_balancer/balance_brokers_test.go
+++ b/weed/mq/pub_balancer/balance_brokers_test.go
@@ -21,8 +21,6 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024},
},
- ConsumerCount: 1,
- IsLeader: true,
})
broker2Stats := &BrokerStats{
TopicPartitionCount: 2,
@@ -35,16 +33,12 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024},
},
- ConsumerCount: 1,
- IsLeader: true,
})
broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: "topic2", Name: "topic2"},
Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024},
},
- ConsumerCount: 1,
- IsLeader: true,
})
brokers.Set("broker1", broker1Stats)
brokers.Set("broker2", broker2Stats)
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index b4bb28e42..00f1f80ca 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -9,15 +9,16 @@ import (
type BrokerStats struct {
TopicPartitionCount int32
- ConsumerCount int32
+ PublisherCount int32
+ SubscriberCount int32
CpuUsagePercent int32
TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
Topics []topic.Topic
}
type TopicPartitionStats struct {
topic.TopicPartition
- ConsumerCount int32
- IsLeader bool
+ PublisherCount int32
+ SubscriberCount int32
}
func NewBrokerStats() *BrokerStats {
@@ -26,15 +27,15 @@ func NewBrokerStats() *BrokerStats {
}
}
func (bs *BrokerStats) String() string {
- return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
- bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
+ return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}",
+ bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
}
func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
bs.TopicPartitionCount = int32(len(stats.Stats))
bs.CpuUsagePercent = stats.CpuUsagePercent
- var consumerCount int32
+ var publisherCount, subscriberCount int32
currentTopicPartitions := bs.TopicPartitionStats.Items()
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
@@ -47,10 +48,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
},
},
- ConsumerCount: topicPartitionStats.ConsumerCount,
- IsLeader: topicPartitionStats.IsLeader,
+ PublisherCount: topicPartitionStats.PublisherCount,
+ SubscriberCount: topicPartitionStats.SubscriberCount,
}
- consumerCount += topicPartitionStats.ConsumerCount
+ publisherCount += topicPartitionStats.PublisherCount
+ subscriberCount += topicPartitionStats.SubscriberCount
key := tps.TopicPartition.String()
bs.TopicPartitionStats.Set(key, tps)
delete(currentTopicPartitions, key)
@@ -59,8 +61,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for key := range currentTopicPartitions {
bs.TopicPartitionStats.Remove(key)
}
- bs.ConsumerCount = consumerCount
-
+ bs.PublisherCount = publisherCount
+ bs.SubscriberCount = subscriberCount
}
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
@@ -74,8 +76,8 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
UnixTimeNs: partition.UnixTimeNs,
},
},
- ConsumerCount: 0,
- IsLeader: true,
+ PublisherCount: 0,
+ SubscriberCount: 0,
}
key := tps.TopicPartition.String()
if isAdd {
diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go
index 0ab1a5ea9..0f307c9eb 100644
--- a/weed/mq/pub_balancer/repair.go
+++ b/weed/mq/pub_balancer/repair.go
@@ -14,8 +14,7 @@ func (balancer *Balancer) RepairTopics() []BalanceAction {
}
type TopicPartitionInfo struct {
- Leader string
- Followers []string
+ Broker string
}
// RepairMissingTopicPartitions check the stats of all brokers,
@@ -38,11 +37,7 @@ func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStat
tpi = &TopicPartitionInfo{}
topicPartitionToInfo[topicPartitionStat.Partition] = tpi
}
- if topicPartitionStat.IsLeader {
- tpi.Leader = broker
- } else {
- tpi.Followers = append(tpi.Followers, broker)
- }
+ tpi.Broker = broker
}
}