aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/balancer/balancer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/balancer/balancer.go')
-rw-r--r--weed/mq/balancer/balancer.go59
1 files changed, 58 insertions, 1 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 74871925f..837dc0ce3 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -1,16 +1,67 @@
package balancer
import (
+ "fmt"
cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+const (
+ MaxPartitionCount = 8 * 9 * 5 * 7 //2520
)
type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats]
}
+
type BrokerStats struct {
TopicPartitionCount int32
ConsumerCount int32
CpuUsagePercent int32
+ Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
+}
+
+func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
+ bs.TopicPartitionCount = int32(len(stats.Stats))
+ bs.CpuUsagePercent = stats.CpuUsagePercent
+
+ var consumerCount int32
+ currentTopicPartitions := bs.Stats.Items()
+ for _, topicPartitionStats := range stats.Stats {
+ tps := &TopicPartitionStats{
+ TopicPartition: TopicPartition{
+ Namespace: topicPartitionStats.Topic.Namespace,
+ Topic: topicPartitionStats.Topic.Name,
+ RangeStart: topicPartitionStats.Partition.RangeStart,
+ RangeStop: topicPartitionStats.Partition.RangeStop,
+ },
+ ConsumerCount: topicPartitionStats.ConsumerCount,
+ IsLeader: topicPartitionStats.IsLeader,
+ }
+ consumerCount += topicPartitionStats.ConsumerCount
+ key := tps.TopicPartition.String()
+ bs.Stats.Set(key, tps)
+ delete(currentTopicPartitions, key)
+ }
+ // remove the topic partitions that are not in the stats
+ for key := range currentTopicPartitions {
+ bs.Stats.Remove(key)
+ }
+ bs.ConsumerCount = consumerCount
+
+}
+
+type TopicPartition struct {
+ Namespace string
+ Topic string
+ RangeStart int32
+ RangeStop int32
+}
+
+type TopicPartitionStats struct {
+ TopicPartition
+ ConsumerCount int32
+ IsLeader bool
}
func NewBalancer() *Balancer {
@@ -20,5 +71,11 @@ func NewBalancer() *Balancer {
}
func NewBrokerStats() *BrokerStats {
- return &BrokerStats{}
+ return &BrokerStats{
+ Stats: cmap.New[*TopicPartitionStats](),
+ }
+}
+
+func (tp *TopicPartition) String() string {
+ return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
}