aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/balancer/balancer.go
blob: 4da735bd3c5ba21807d1ac261ce029bb6f7ca862 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package balancer

import (
	"fmt"
	cmap "github.com/orcaman/concurrent-map/v2"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)

const (
	MaxPartitionCount  = 8 * 9 * 5 * 7 //2520
	LockBrokerBalancer = "broker_balancer"
)

type Balancer struct {
	Brokers cmap.ConcurrentMap[string, *BrokerStats]
}

type BrokerStats struct {
	TopicPartitionCount int32
	ConsumerCount       int32
	CpuUsagePercent     int32
	Stats               cmap.ConcurrentMap[string, *TopicPartitionStats]
}

func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
	bs.TopicPartitionCount = int32(len(stats.Stats))
	bs.CpuUsagePercent = stats.CpuUsagePercent

	var consumerCount int32
	currentTopicPartitions := bs.Stats.Items()
	for _, topicPartitionStats := range stats.Stats {
		tps := &TopicPartitionStats{
			TopicPartition: TopicPartition{
				Namespace:  topicPartitionStats.Topic.Namespace,
				Topic:      topicPartitionStats.Topic.Name,
				RangeStart: topicPartitionStats.Partition.RangeStart,
				RangeStop:  topicPartitionStats.Partition.RangeStop,
			},
			ConsumerCount: topicPartitionStats.ConsumerCount,
			IsLeader:      topicPartitionStats.IsLeader,
		}
		consumerCount += topicPartitionStats.ConsumerCount
		key := tps.TopicPartition.String()
		bs.Stats.Set(key, tps)
		delete(currentTopicPartitions, key)
	}
	// remove the topic partitions that are not in the stats
	for key := range currentTopicPartitions {
		bs.Stats.Remove(key)
	}
	bs.ConsumerCount = consumerCount

}

func (bs *BrokerStats) String() string {
	return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
		bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.Stats.Items())
}

type TopicPartition struct {
	Namespace  string
	Topic      string
	RangeStart int32
	RangeStop  int32
}

type TopicPartitionStats struct {
	TopicPartition
	ConsumerCount int32
	IsLeader      bool
}

func NewBalancer() *Balancer {
	return &Balancer{
		Brokers: cmap.New[*BrokerStats](),
	}
}

func NewBrokerStats() *BrokerStats {
	return &BrokerStats{
		Stats: cmap.New[*TopicPartitionStats](),
	}
}

func (tp *TopicPartition) String() string {
	return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
}