aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_manager.go')
-rw-r--r--weed/mq/topic/local_manager.go15
1 files changed, 8 insertions, 7 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index aa2eefcdc..79a84561c 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -19,8 +19,8 @@ func NewLocalTopicManager() *LocalTopicManager {
}
}
-// AddTopicPartition adds a topic to the local topic manager
-func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
+// AddLocalPartition adds a topic to the local topic manager
+func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) {
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
localTopic = NewLocalTopic(topic)
@@ -34,8 +34,8 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
localTopic.Partitions = append(localTopic.Partitions, localPartition)
}
-// GetTopicPartition gets a topic from the local topic manager
-func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
+// GetLocalPartition gets a topic from the local topic manager
+func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition {
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
return nil
@@ -48,7 +48,7 @@ func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
manager.topics.Remove(topic.String())
}
-func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
+func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool) {
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
return false
@@ -96,8 +96,9 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
Namespace: string(localTopic.Namespace),
Name: localTopic.Name,
},
- Partition: localPartition.Partition.ToPbPartition(),
- ConsumerCount: localPartition.ConsumerCount,
+ Partition: localPartition.Partition.ToPbPartition(),
+ PublisherCount: int32(localPartition.Publishers.Size()),
+ SubscriberCount: int32(localPartition.Subscribers.Size()),
}
// fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition)
}