diff options
Diffstat (limited to 'weed/mq/topic/local_manager.go')
| -rw-r--r-- | weed/mq/topic/local_manager.go | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index aa2eefcdc..79a84561c 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -19,8 +19,8 @@ func NewLocalTopicManager() *LocalTopicManager { } } -// AddTopicPartition adds a topic to the local topic manager -func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) { +// AddLocalPartition adds a topic to the local topic manager +func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { localTopic = NewLocalTopic(topic) @@ -34,8 +34,8 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition localTopic.Partitions = append(localTopic.Partitions, localPartition) } -// GetTopicPartition gets a topic from the local topic manager -func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition { +// GetLocalPartition gets a topic from the local topic manager +func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return nil @@ -48,7 +48,7 @@ func (manager *LocalTopicManager) RemoveTopic(topic Topic) { manager.topics.Remove(topic.String()) } -func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) { +func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return false @@ -96,8 +96,9 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br Namespace: string(localTopic.Namespace), Name: localTopic.Name, }, - Partition: localPartition.Partition.ToPbPartition(), - ConsumerCount: localPartition.ConsumerCount, + Partition: localPartition.Partition.ToPbPartition(), + PublisherCount: int32(localPartition.Publishers.Size()), + SubscriberCount: int32(localPartition.Subscribers.Size()), } // fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition) } |
