aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer/broker_stats.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer/broker_stats.go')
-rw-r--r--weed/mq/pub_balancer/broker_stats.go28
1 files changed, 15 insertions, 13 deletions
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index b4bb28e42..00f1f80ca 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -9,15 +9,16 @@ import (
type BrokerStats struct {
TopicPartitionCount int32
- ConsumerCount int32
+ PublisherCount int32
+ SubscriberCount int32
CpuUsagePercent int32
TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
Topics []topic.Topic
}
type TopicPartitionStats struct {
topic.TopicPartition
- ConsumerCount int32
- IsLeader bool
+ PublisherCount int32
+ SubscriberCount int32
}
func NewBrokerStats() *BrokerStats {
@@ -26,15 +27,15 @@ func NewBrokerStats() *BrokerStats {
}
}
func (bs *BrokerStats) String() string {
- return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
- bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
+ return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}",
+ bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
}
func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
bs.TopicPartitionCount = int32(len(stats.Stats))
bs.CpuUsagePercent = stats.CpuUsagePercent
- var consumerCount int32
+ var publisherCount, subscriberCount int32
currentTopicPartitions := bs.TopicPartitionStats.Items()
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
@@ -47,10 +48,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
},
},
- ConsumerCount: topicPartitionStats.ConsumerCount,
- IsLeader: topicPartitionStats.IsLeader,
+ PublisherCount: topicPartitionStats.PublisherCount,
+ SubscriberCount: topicPartitionStats.SubscriberCount,
}
- consumerCount += topicPartitionStats.ConsumerCount
+ publisherCount += topicPartitionStats.PublisherCount
+ subscriberCount += topicPartitionStats.SubscriberCount
key := tps.TopicPartition.String()
bs.TopicPartitionStats.Set(key, tps)
delete(currentTopicPartitions, key)
@@ -59,8 +61,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for key := range currentTopicPartitions {
bs.TopicPartitionStats.Remove(key)
}
- bs.ConsumerCount = consumerCount
-
+ bs.PublisherCount = publisherCount
+ bs.SubscriberCount = subscriberCount
}
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
@@ -74,8 +76,8 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
UnixTimeNs: partition.UnixTimeNs,
},
},
- ConsumerCount: 0,
- IsLeader: true,
+ PublisherCount: 0,
+ SubscriberCount: 0,
}
key := tps.TopicPartition.String()
if isAdd {