diff options
Diffstat (limited to 'weed/mq/balancer/balancer.go')
| -rw-r--r-- | weed/mq/balancer/balancer.go | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 4da735bd3..bc919d4e3 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -3,6 +3,7 @@ package balancer import ( "fmt" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -30,7 +31,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { currentTopicPartitions := bs.Stats.Items() for _, topicPartitionStats := range stats.Stats { tps := &TopicPartitionStats{ - TopicPartition: TopicPartition{ + TopicPartition: topic.TopicPartition{ Namespace: topicPartitionStats.Topic.Namespace, Topic: topicPartitionStats.Topic.Name, RangeStart: topicPartitionStats.Partition.RangeStart, @@ -52,20 +53,28 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) { } +func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) { + tps := &TopicPartitionStats{ + TopicPartition: topic.TopicPartition{ + Namespace: t.Namespace, + Topic: t.Name, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + ConsumerCount: 0, + IsLeader: true, + } + key := tps.TopicPartition.String() + bs.Stats.Set(key, tps) +} + func (bs *BrokerStats) String() string { return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}", bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.Stats.Items()) } -type TopicPartition struct { - Namespace string - Topic string - RangeStart int32 - RangeStop int32 -} - type TopicPartitionStats struct { - TopicPartition + topic.TopicPartition ConsumerCount int32 IsLeader bool } @@ -81,7 +90,3 @@ func NewBrokerStats() *BrokerStats { Stats: cmap.New[*TopicPartitionStats](), } } - -func (tp *TopicPartition) String() string { - return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop) -} |
