aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/balancer/balancer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/balancer/balancer.go')
-rw-r--r--weed/mq/balancer/balancer.go31
1 files changed, 27 insertions, 4 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 4d9005e82..4c5f8f3c8 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -1,12 +1,18 @@
package balancer
-import cmap "github.com/orcaman/concurrent-map"
+import (
+ "fmt"
+ cmap "github.com/orcaman/concurrent-map/v2"
+)
type Balancer struct {
- brokers cmap.ConcurrentMap[string, *BrokerStats]
+ Brokers cmap.ConcurrentMap[string, *BrokerStats]
}
type BrokerStats struct {
- stats map[TopicPartition]*TopicPartitionStats
+ TopicPartitionCount int32
+ MessageCount int64
+ BytesCount int64
+ CpuUsagePercent int32
}
type TopicPartition struct {
@@ -16,5 +22,22 @@ type TopicPartition struct {
}
type TopicPartitionStats struct {
- Throughput int64
+ TopicPartition
+ Throughput int64
+ ConsumerCount int64
+ TopicPartitionCount int64
+}
+
+func NewBalancer() *Balancer {
+ return &Balancer{
+ Brokers: cmap.New[*BrokerStats](),
+ }
+}
+
+func NewBrokerStats() *BrokerStats {
+ return &BrokerStats{}
+}
+
+func (tp *TopicPartition) String() string {
+ return fmt.Sprintf("%v-%04d-%04d", tp.Topic, tp.RangeStart, tp.RangeStop)
}