aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/balancer/balancer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/balancer/balancer.go')
-rw-r--r--weed/mq/balancer/balancer.go59
1 files changed, 1 insertions, 58 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 837dc0ce3..74871925f 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -1,67 +1,16 @@
package balancer
import (
- "fmt"
cmap "github.com/orcaman/concurrent-map/v2"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
-)
-
-const (
- MaxPartitionCount = 8 * 9 * 5 * 7 //2520
)
type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats]
}
-
type BrokerStats struct {
TopicPartitionCount int32
ConsumerCount int32
CpuUsagePercent int32
- Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
-}
-
-func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
- bs.TopicPartitionCount = int32(len(stats.Stats))
- bs.CpuUsagePercent = stats.CpuUsagePercent
-
- var consumerCount int32
- currentTopicPartitions := bs.Stats.Items()
- for _, topicPartitionStats := range stats.Stats {
- tps := &TopicPartitionStats{
- TopicPartition: TopicPartition{
- Namespace: topicPartitionStats.Topic.Namespace,
- Topic: topicPartitionStats.Topic.Name,
- RangeStart: topicPartitionStats.Partition.RangeStart,
- RangeStop: topicPartitionStats.Partition.RangeStop,
- },
- ConsumerCount: topicPartitionStats.ConsumerCount,
- IsLeader: topicPartitionStats.IsLeader,
- }
- consumerCount += topicPartitionStats.ConsumerCount
- key := tps.TopicPartition.String()
- bs.Stats.Set(key, tps)
- delete(currentTopicPartitions, key)
- }
- // remove the topic partitions that are not in the stats
- for key := range currentTopicPartitions {
- bs.Stats.Remove(key)
- }
- bs.ConsumerCount = consumerCount
-
-}
-
-type TopicPartition struct {
- Namespace string
- Topic string
- RangeStart int32
- RangeStop int32
-}
-
-type TopicPartitionStats struct {
- TopicPartition
- ConsumerCount int32
- IsLeader bool
}
func NewBalancer() *Balancer {
@@ -71,11 +20,5 @@ func NewBalancer() *Balancer {
}
func NewBrokerStats() *BrokerStats {
- return &BrokerStats{
- Stats: cmap.New[*TopicPartitionStats](),
- }
-}
-
-func (tp *TopicPartition) String() string {
- return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
+ return &BrokerStats{}
}