aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_manager.go')
-rw-r--r--weed/mq/topic/local_manager.go20
1 files changed, 18 insertions, 2 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 328684e4b..99a7fc8c3 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -6,7 +6,7 @@ import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "github.com/shirou/gopsutil/v3/cpu"
+ "github.com/shirou/gopsutil/v4/cpu"
)
// LocalTopicManager manages topics on local broker
@@ -39,7 +39,8 @@ func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Parti
if !ok {
return nil
}
- return localTopic.findPartition(partition)
+ result := localTopic.findPartition(partition)
+ return result
}
// RemoveTopic removes a topic from the local topic manager
@@ -71,6 +72,21 @@ func (manager *LocalTopicManager) CloseSubscribers(topic Topic, unixTsNs int64)
return localTopic.closePartitionSubscribers(unixTsNs)
}
+// ListTopicsInMemory returns all topics currently tracked in memory
+func (manager *LocalTopicManager) ListTopicsInMemory() []Topic {
+ var topics []Topic
+ for item := range manager.topics.IterBuffered() {
+ topics = append(topics, item.Val.Topic)
+ }
+ return topics
+}
+
+// TopicExistsInMemory checks if a topic exists in memory (not flushed data)
+func (manager *LocalTopicManager) TopicExistsInMemory(topic Topic) bool {
+ _, exists := manager.topics.Get(topic.String())
+ return exists
+}
+
func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats {
stats := &mq_pb.BrokerStats{
Stats: make(map[string]*mq_pb.TopicPartitionStats),