diff options
Diffstat (limited to 'weed/mq/topic/local_manager.go')
| -rw-r--r-- | weed/mq/topic/local_manager.go | 79 |
1 files changed, 77 insertions, 2 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 99a7fc8c3..bc33fdab0 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -1,9 +1,11 @@ package topic import ( + "context" "time" cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/shirou/gopsutil/v4/cpu" @@ -11,16 +13,89 @@ import ( // LocalTopicManager manages topics on local broker type LocalTopicManager struct { - topics cmap.ConcurrentMap[string, *LocalTopic] + topics cmap.ConcurrentMap[string, *LocalTopic] + cleanupDone chan struct{} // Signal cleanup goroutine to stop + cleanupTimer *time.Ticker } // NewLocalTopicManager creates a new LocalTopicManager func NewLocalTopicManager() *LocalTopicManager { return &LocalTopicManager{ - topics: cmap.New[*LocalTopic](), + topics: cmap.New[*LocalTopic](), + cleanupDone: make(chan struct{}), } } +// StartIdlePartitionCleanup starts a background goroutine that periodically +// cleans up idle partitions (partitions with no publishers and no subscribers) +func (manager *LocalTopicManager) StartIdlePartitionCleanup(ctx context.Context, checkInterval, idleTimeout time.Duration) { + manager.cleanupTimer = time.NewTicker(checkInterval) + + go func() { + defer close(manager.cleanupDone) + defer manager.cleanupTimer.Stop() + + glog.V(1).Infof("Idle partition cleanup started: check every %v, cleanup after %v idle", checkInterval, idleTimeout) + + for { + select { + case <-ctx.Done(): + glog.V(1).Info("Idle partition cleanup stopped") + return + case <-manager.cleanupTimer.C: + manager.cleanupIdlePartitions(idleTimeout) + } + } + }() +} + +// cleanupIdlePartitions removes idle partitions from memory +func (manager *LocalTopicManager) cleanupIdlePartitions(idleTimeout time.Duration) { + cleanedCount := 0 + + // Iterate through all topics + manager.topics.IterCb(func(topicKey string, localTopic *LocalTopic) { + localTopic.partitionLock.Lock() + defer localTopic.partitionLock.Unlock() + + // Check each partition + for i := len(localTopic.Partitions) - 1; i >= 0; i-- { + partition := localTopic.Partitions[i] + + if partition.ShouldCleanup(idleTimeout) { + glog.V(1).Infof("Cleaning up idle partition %s (idle for %v, publishers=%d, subscribers=%d)", + partition.Partition.String(), + partition.GetIdleDuration(), + partition.Publishers.Size(), + partition.Subscribers.Size()) + + // Shutdown the partition (closes LogBuffer, etc.) + partition.Shutdown() + + // Remove from slice + localTopic.Partitions = append(localTopic.Partitions[:i], localTopic.Partitions[i+1:]...) + cleanedCount++ + } + } + + // If topic has no partitions left, remove it + if len(localTopic.Partitions) == 0 { + glog.V(1).Infof("Removing empty topic %s", topicKey) + manager.topics.Remove(topicKey) + } + }) + + if cleanedCount > 0 { + glog.V(0).Infof("Cleaned up %d idle partition(s)", cleanedCount) + } +} + +// WaitForCleanupShutdown waits for the cleanup goroutine to finish +func (manager *LocalTopicManager) WaitForCleanupShutdown() { + <-manager.cleanupDone + glog.V(1).Info("Idle partition cleanup shutdown complete") +} + // AddLocalPartition adds a topic to the local topic manager func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) { localTopic, ok := manager.topics.Get(topic.String()) |
