aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_manager.go')
-rw-r--r--weed/mq/topic/local_manager.go79
1 files changed, 77 insertions, 2 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 99a7fc8c3..bc33fdab0 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -1,9 +1,11 @@
package topic
import (
+ "context"
"time"
cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/shirou/gopsutil/v4/cpu"
@@ -11,16 +13,89 @@ import (
// LocalTopicManager manages topics on local broker
type LocalTopicManager struct {
- topics cmap.ConcurrentMap[string, *LocalTopic]
+ topics cmap.ConcurrentMap[string, *LocalTopic]
+ cleanupDone chan struct{} // Signal cleanup goroutine to stop
+ cleanupTimer *time.Ticker
}
// NewLocalTopicManager creates a new LocalTopicManager
func NewLocalTopicManager() *LocalTopicManager {
return &LocalTopicManager{
- topics: cmap.New[*LocalTopic](),
+ topics: cmap.New[*LocalTopic](),
+ cleanupDone: make(chan struct{}),
}
}
+// StartIdlePartitionCleanup starts a background goroutine that periodically
+// cleans up idle partitions (partitions with no publishers and no subscribers)
+func (manager *LocalTopicManager) StartIdlePartitionCleanup(ctx context.Context, checkInterval, idleTimeout time.Duration) {
+ manager.cleanupTimer = time.NewTicker(checkInterval)
+
+ go func() {
+ defer close(manager.cleanupDone)
+ defer manager.cleanupTimer.Stop()
+
+ glog.V(1).Infof("Idle partition cleanup started: check every %v, cleanup after %v idle", checkInterval, idleTimeout)
+
+ for {
+ select {
+ case <-ctx.Done():
+ glog.V(1).Info("Idle partition cleanup stopped")
+ return
+ case <-manager.cleanupTimer.C:
+ manager.cleanupIdlePartitions(idleTimeout)
+ }
+ }
+ }()
+}
+
+// cleanupIdlePartitions removes idle partitions from memory
+func (manager *LocalTopicManager) cleanupIdlePartitions(idleTimeout time.Duration) {
+ cleanedCount := 0
+
+ // Iterate through all topics
+ manager.topics.IterCb(func(topicKey string, localTopic *LocalTopic) {
+ localTopic.partitionLock.Lock()
+ defer localTopic.partitionLock.Unlock()
+
+ // Check each partition
+ for i := len(localTopic.Partitions) - 1; i >= 0; i-- {
+ partition := localTopic.Partitions[i]
+
+ if partition.ShouldCleanup(idleTimeout) {
+ glog.V(1).Infof("Cleaning up idle partition %s (idle for %v, publishers=%d, subscribers=%d)",
+ partition.Partition.String(),
+ partition.GetIdleDuration(),
+ partition.Publishers.Size(),
+ partition.Subscribers.Size())
+
+ // Shutdown the partition (closes LogBuffer, etc.)
+ partition.Shutdown()
+
+ // Remove from slice
+ localTopic.Partitions = append(localTopic.Partitions[:i], localTopic.Partitions[i+1:]...)
+ cleanedCount++
+ }
+ }
+
+ // If topic has no partitions left, remove it
+ if len(localTopic.Partitions) == 0 {
+ glog.V(1).Infof("Removing empty topic %s", topicKey)
+ manager.topics.Remove(topicKey)
+ }
+ })
+
+ if cleanedCount > 0 {
+ glog.V(0).Infof("Cleaned up %d idle partition(s)", cleanedCount)
+ }
+}
+
+// WaitForCleanupShutdown waits for the cleanup goroutine to finish
+func (manager *LocalTopicManager) WaitForCleanupShutdown() {
+ <-manager.cleanupDone
+ glog.V(1).Info("Idle partition cleanup shutdown complete")
+}
+
// AddLocalPartition adds a topic to the local topic manager
func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) {
localTopic, ok := manager.topics.Get(topic.String())