diff options
Diffstat (limited to 'weed/mq/pub_balancer/broker_stats.go')
| -rw-r--r-- | weed/mq/pub_balancer/broker_stats.go | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go index b4bb28e42..00f1f80ca 100644 --- a/weed/mq/pub_balancer/broker_stats.go +++ b/weed/mq/pub_balancer/broker_stats.go @@ -9,15 +9,16 @@ import ( type BrokerStats struct { TopicPartitionCount int32 - ConsumerCount int32 + PublisherCount int32 + SubscriberCount int32 CpuUsagePercent int32 TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition Topics []topic.Topic } type TopicPartitionStats struct { topic.TopicPartition - ConsumerCount int32 - IsLeader bool + PublisherCount int32 + SubscriberCount int32 } func NewBrokerStats() *BrokerStats { @@ -26,15 +27,15 @@ func NewBrokerStats() *BrokerStats { } } func (bs *BrokerStats) String() string { - return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}", - bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items()) + return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}", + bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items()) } func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { bs.TopicPartitionCount = int32(len(stats.Stats)) bs.CpuUsagePercent = stats.CpuUsagePercent - var consumerCount int32 + var publisherCount, subscriberCount int32 currentTopicPartitions := bs.TopicPartitionStats.Items() for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ @@ -47,10 +48,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs, }, }, - ConsumerCount: topicPartitionStats.ConsumerCount, - IsLeader: topicPartitionStats.IsLeader, + PublisherCount: topicPartitionStats.PublisherCount, + SubscriberCount: topicPartitionStats.SubscriberCount, } - consumerCount += topicPartitionStats.ConsumerCount + publisherCount += topicPartitionStats.PublisherCount + subscriberCount += topicPartitionStats.SubscriberCount key := tps.TopicPartition.String() bs.TopicPartitionStats.Set(key, tps) delete(currentTopicPartitions, key) @@ -59,8 +61,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { for key := range currentTopicPartitions { bs.TopicPartitionStats.Remove(key) } - bs.ConsumerCount = consumerCount - + bs.PublisherCount = publisherCount + bs.SubscriberCount = subscriberCount } func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) { @@ -74,8 +76,8 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti UnixTimeNs: partition.UnixTimeNs, }, }, - ConsumerCount: 0, - IsLeader: true, + PublisherCount: 0, + SubscriberCount: 0, } key := tps.TopicPartition.String() if isAdd { |
