aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer/broker_stats.go
blob: c579c275e38cf7c93752930ee646609ea178b59a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package pub_balancer

import (
	"fmt"
	cmap "github.com/orcaman/concurrent-map/v2"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)

type BrokerStats struct {
	TopicPartitionCount int32
	PublisherCount      int32
	SubscriberCount     int32
	CpuUsagePercent     int32
	TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
	Topics              []topic.Topic
}
type TopicPartitionStats struct {
	topic.TopicPartition
	PublisherCount  int32
	SubscriberCount int32
}

func NewBrokerStats() *BrokerStats {
	return &BrokerStats{
		TopicPartitionStats: cmap.New[*TopicPartitionStats](),
	}
}
func (bs *BrokerStats) String() string {
	return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}",
		bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
}

func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
	bs.TopicPartitionCount = int32(len(stats.Stats))
	bs.CpuUsagePercent = stats.CpuUsagePercent

	var publisherCount, subscriberCount int32
	currentTopicPartitions := bs.TopicPartitionStats.Items()
	for _, topicPartitionStats := range stats.Stats {
		tps := &TopicPartitionStats{
			TopicPartition: topic.TopicPartition{
				Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
				Partition: topic.Partition{
					RangeStart: topicPartitionStats.Partition.RangeStart,
					RangeStop:  topicPartitionStats.Partition.RangeStop,
					RingSize:   topicPartitionStats.Partition.RingSize,
					UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
				},
			},
			PublisherCount:  topicPartitionStats.PublisherCount,
			SubscriberCount: topicPartitionStats.SubscriberCount,
		}
		publisherCount += topicPartitionStats.PublisherCount
		subscriberCount += topicPartitionStats.SubscriberCount
		key := tps.TopicPartition.String()
		bs.TopicPartitionStats.Set(key, tps)
		delete(currentTopicPartitions, key)
	}
	// remove the topic partitions that are not in the stats
	for key := range currentTopicPartitions {
		bs.TopicPartitionStats.Remove(key)
	}
	bs.PublisherCount = publisherCount
	bs.SubscriberCount = subscriberCount
}

func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
	tps := &TopicPartitionStats{
		TopicPartition: topic.TopicPartition{
			Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
			Partition: topic.Partition{
				RangeStart: partition.RangeStart,
				RangeStop:  partition.RangeStop,
				RingSize:   partition.RingSize,
				UnixTimeNs: partition.UnixTimeNs,
			},
		},
		PublisherCount:  0,
		SubscriberCount: 0,
	}
	key := tps.TopicPartition.String()
	if isAdd {
		bs.TopicPartitionStats.SetIfAbsent(key, tps)
	} else {
		bs.TopicPartitionStats.Remove(key)
	}
}