diff options
Diffstat (limited to 'weed/mq/topic/local_manager.go')
| -rw-r--r-- | weed/mq/topic/local_manager.go | 20 |
1 files changed, 18 insertions, 2 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 328684e4b..99a7fc8c3 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -6,7 +6,7 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v4/cpu" ) // LocalTopicManager manages topics on local broker @@ -39,7 +39,8 @@ func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Parti if !ok { return nil } - return localTopic.findPartition(partition) + result := localTopic.findPartition(partition) + return result } // RemoveTopic removes a topic from the local topic manager @@ -71,6 +72,21 @@ func (manager *LocalTopicManager) CloseSubscribers(topic Topic, unixTsNs int64) return localTopic.closePartitionSubscribers(unixTsNs) } +// ListTopicsInMemory returns all topics currently tracked in memory +func (manager *LocalTopicManager) ListTopicsInMemory() []Topic { + var topics []Topic + for item := range manager.topics.IterBuffered() { + topics = append(topics, item.Val.Topic) + } + return topics +} + +// TopicExistsInMemory checks if a topic exists in memory (not flushed data) +func (manager *LocalTopicManager) TopicExistsInMemory(topic Topic) bool { + _, exists := manager.topics.Get(topic.String()) + return exists +} + func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats { stats := &mq_pb.BrokerStats{ Stats: make(map[string]*mq_pb.TopicPartitionStats), |
