diff options
Diffstat (limited to 'weed/filer/empty_folder_cleanup/empty_folder_cleaner.go')
| -rw-r--r-- | weed/filer/empty_folder_cleanup/empty_folder_cleaner.go | 436 |
1 files changed, 436 insertions, 0 deletions
diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go new file mode 100644 index 000000000..70856aaf1 --- /dev/null +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -0,0 +1,436 @@ +package empty_folder_cleanup + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +const ( + DefaultMaxCountCheck = 1000 + DefaultCacheExpiry = 5 * time.Minute + DefaultQueueMaxSize = 1000 + DefaultQueueMaxAge = 10 * time.Minute + DefaultProcessorSleep = 10 * time.Second // How often to check queue +) + +// FilerOperations defines the filer operations needed by EmptyFolderCleaner +type FilerOperations interface { + CountDirectoryEntries(ctx context.Context, dirPath util.FullPath, limit int) (count int, err error) + DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32, ifNotModifiedAfter int64) error +} + +// folderState tracks the state of a folder for empty folder cleanup +type folderState struct { + roughCount int // Cached rough count (up to maxCountCheck) + lastAddTime time.Time // Last time an item was added + lastDelTime time.Time // Last time an item was deleted + lastCheck time.Time // Last time we checked the actual count +} + +// EmptyFolderCleaner handles asynchronous cleanup of empty folders +// Each filer owns specific folders via consistent hashing based on the peer filer list +type EmptyFolderCleaner struct { + filer FilerOperations + lockRing *lock_manager.LockRing + host pb.ServerAddress + + // Folder state tracking + mu sync.RWMutex + folderCounts map[string]*folderState // Rough count cache + + // Cleanup queue (thread-safe, has its own lock) + cleanupQueue *CleanupQueue + + // Configuration + maxCountCheck int // Max items to count (1000) + cacheExpiry time.Duration // How long to keep cache entries + processorSleep time.Duration // How often processor checks queue + bucketPath string // e.g., "/buckets" + + // Control + enabled bool + stopCh chan struct{} +} + +// NewEmptyFolderCleaner creates a new EmptyFolderCleaner +func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string) *EmptyFolderCleaner { + efc := &EmptyFolderCleaner{ + filer: filer, + lockRing: lockRing, + host: host, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge), + maxCountCheck: DefaultMaxCountCheck, + cacheExpiry: DefaultCacheExpiry, + processorSleep: DefaultProcessorSleep, + bucketPath: bucketPath, + enabled: true, + stopCh: make(chan struct{}), + } + go efc.cacheEvictionLoop() + go efc.cleanupProcessor() + return efc +} + +// SetEnabled enables or disables the cleaner +func (efc *EmptyFolderCleaner) SetEnabled(enabled bool) { + efc.mu.Lock() + defer efc.mu.Unlock() + efc.enabled = enabled +} + +// IsEnabled returns whether the cleaner is enabled +func (efc *EmptyFolderCleaner) IsEnabled() bool { + efc.mu.RLock() + defer efc.mu.RUnlock() + return efc.enabled +} + +// ownsFolder checks if this filer owns the folder via consistent hashing +func (efc *EmptyFolderCleaner) ownsFolder(folder string) bool { + servers := efc.lockRing.GetSnapshot() + if len(servers) <= 1 { + return true // Single filer case + } + return efc.hashKeyToServer(folder, servers) == efc.host +} + +// hashKeyToServer uses consistent hashing to map a folder to a server +func (efc *EmptyFolderCleaner) hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress { + if len(servers) == 0 { + return "" + } + x := util.HashStringToLong(key) + if x < 0 { + x = -x + } + x = x % int64(len(servers)) + return servers[x] +} + +// OnDeleteEvent is called when a file or directory is deleted +// Both file and directory deletions count towards making the parent folder empty +// eventTime is the time when the delete event occurred (for proper ordering) +func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, isDirectory bool, eventTime time.Time) { + // Skip if not under bucket path (must be at least /buckets/<bucket>/...) + if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) { + return + } + + // Check if we own this folder + if !efc.ownsFolder(directory) { + glog.V(4).Infof("EmptyFolderCleaner: not owner of %s, skipping", directory) + return + } + + efc.mu.Lock() + defer efc.mu.Unlock() + + // Check enabled inside lock to avoid race with Stop() + if !efc.enabled { + return + } + + glog.V(3).Infof("EmptyFolderCleaner: delete event in %s/%s (isDir=%v)", directory, entryName, isDirectory) + + // Update cached count (create entry if needed) + state, exists := efc.folderCounts[directory] + if !exists { + state = &folderState{} + efc.folderCounts[directory] = state + } + if state.roughCount > 0 { + state.roughCount-- + } + state.lastDelTime = eventTime + + // Only add to cleanup queue if roughCount suggests folder might be empty + if state.roughCount > 0 { + glog.V(3).Infof("EmptyFolderCleaner: skipping queue for %s, roughCount=%d", directory, state.roughCount) + return + } + + // Add to cleanup queue with event time (handles out-of-order events) + if efc.cleanupQueue.Add(directory, eventTime) { + glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory) + } +} + +// OnCreateEvent is called when a file or directory is created +// Both file and directory creations cancel pending cleanup for the parent folder +func (efc *EmptyFolderCleaner) OnCreateEvent(directory string, entryName string, isDirectory bool) { + // Skip if not under bucket path (must be at least /buckets/<bucket>/...) + if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) { + return + } + + efc.mu.Lock() + defer efc.mu.Unlock() + + // Check enabled inside lock to avoid race with Stop() + if !efc.enabled { + return + } + + // Update cached count only if already tracked (no need to track new folders) + if state, exists := efc.folderCounts[directory]; exists { + state.roughCount++ + state.lastAddTime = time.Now() + } + + // Remove from cleanup queue (cancel pending cleanup) + if efc.cleanupQueue.Remove(directory) { + glog.V(3).Infof("EmptyFolderCleaner: cancelled cleanup for %s due to new entry", directory) + } +} + +// cleanupProcessor runs in background and processes the cleanup queue +func (efc *EmptyFolderCleaner) cleanupProcessor() { + ticker := time.NewTicker(efc.processorSleep) + defer ticker.Stop() + + for { + select { + case <-efc.stopCh: + return + case <-ticker.C: + efc.processCleanupQueue() + } + } +} + +// processCleanupQueue processes items from the cleanup queue +func (efc *EmptyFolderCleaner) processCleanupQueue() { + // Check if we should process + if !efc.cleanupQueue.ShouldProcess() { + return + } + + glog.V(3).Infof("EmptyFolderCleaner: processing cleanup queue (len=%d, age=%v)", + efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge()) + + // Process all items that are ready + for efc.cleanupQueue.Len() > 0 { + // Check if still enabled + if !efc.IsEnabled() { + return + } + + // Pop the oldest item + folder, ok := efc.cleanupQueue.Pop() + if !ok { + break + } + + // Execute cleanup for this folder + efc.executeCleanup(folder) + + // If queue is no longer full and oldest item is not old enough, stop processing + if !efc.cleanupQueue.ShouldProcess() { + break + } + } +} + +// executeCleanup performs the actual cleanup of an empty folder +func (efc *EmptyFolderCleaner) executeCleanup(folder string) { + efc.mu.Lock() + + // Quick check: if we have cached count and it's > 0, skip + if state, exists := efc.folderCounts[folder]; exists { + if state.roughCount > 0 { + glog.V(3).Infof("EmptyFolderCleaner: skipping %s, cached count=%d", folder, state.roughCount) + efc.mu.Unlock() + return + } + // If there was an add after our delete, skip + if !state.lastAddTime.IsZero() && state.lastAddTime.After(state.lastDelTime) { + glog.V(3).Infof("EmptyFolderCleaner: skipping %s, add happened after delete", folder) + efc.mu.Unlock() + return + } + } + efc.mu.Unlock() + + // Re-check ownership (topology might have changed) + if !efc.ownsFolder(folder) { + glog.V(3).Infof("EmptyFolderCleaner: no longer owner of %s, skipping", folder) + return + } + + // Check if folder is actually empty (count up to maxCountCheck) + ctx := context.Background() + count, err := efc.countItems(ctx, folder) + if err != nil { + glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err) + return + } + + efc.mu.Lock() + // Update cache + if _, exists := efc.folderCounts[folder]; !exists { + efc.folderCounts[folder] = &folderState{} + } + efc.folderCounts[folder].roughCount = count + efc.folderCounts[folder].lastCheck = time.Now() + efc.mu.Unlock() + + if count > 0 { + glog.V(3).Infof("EmptyFolderCleaner: folder %s has %d items, not empty", folder, count) + return + } + + // Delete the empty folder + glog.V(2).Infof("EmptyFolderCleaner: deleting empty folder %s", folder) + if err := efc.deleteFolder(ctx, folder); err != nil { + glog.V(2).Infof("EmptyFolderCleaner: failed to delete empty folder %s: %v", folder, err) + return + } + + // Clean up cache entry + efc.mu.Lock() + delete(efc.folderCounts, folder) + efc.mu.Unlock() + + // Note: No need to recursively check parent folder here. + // The deletion of this folder will generate a metadata event, + // which will trigger OnDeleteEvent for the parent folder. +} + +// countItems counts items in a folder (up to maxCountCheck) +func (efc *EmptyFolderCleaner) countItems(ctx context.Context, folder string) (int, error) { + return efc.filer.CountDirectoryEntries(ctx, util.FullPath(folder), efc.maxCountCheck) +} + +// deleteFolder deletes an empty folder +func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string) error { + return efc.filer.DeleteEntryMetaAndData(ctx, util.FullPath(folder), false, false, false, false, nil, 0) +} + +// isUnderPath checks if child is under parent path +func isUnderPath(child, parent string) bool { + if parent == "" || parent == "/" { + return true + } + // Ensure parent ends without slash for proper prefix matching + if len(parent) > 0 && parent[len(parent)-1] == '/' { + parent = parent[:len(parent)-1] + } + // Child must start with parent and then have a / or be exactly parent + if len(child) < len(parent) { + return false + } + if child[:len(parent)] != parent { + return false + } + if len(child) == len(parent) { + return true + } + return child[len(parent)] == '/' +} + +// isUnderBucketPath checks if directory is inside a bucket (under /buckets/<bucket>/...) +// This ensures we only clean up folders inside buckets, not the buckets themselves +func isUnderBucketPath(directory, bucketPath string) bool { + if bucketPath == "" { + return true + } + // Ensure bucketPath ends without slash + if len(bucketPath) > 0 && bucketPath[len(bucketPath)-1] == '/' { + bucketPath = bucketPath[:len(bucketPath)-1] + } + // Directory must be under bucketPath + if !isUnderPath(directory, bucketPath) { + return false + } + // Directory must be at least /buckets/<bucket>/<something> + // i.e., depth must be at least bucketPath depth + 2 + // For /buckets (depth 1), we need at least /buckets/mybucket/folder (depth 3) + bucketPathDepth := strings.Count(bucketPath, "/") + directoryDepth := strings.Count(directory, "/") + return directoryDepth >= bucketPathDepth+2 +} + +// cacheEvictionLoop periodically removes stale entries from folderCounts +func (efc *EmptyFolderCleaner) cacheEvictionLoop() { + ticker := time.NewTicker(efc.cacheExpiry) + defer ticker.Stop() + + for { + select { + case <-efc.stopCh: + return + case <-ticker.C: + efc.evictStaleCacheEntries() + } + } +} + +// evictStaleCacheEntries removes cache entries that haven't been accessed recently +func (efc *EmptyFolderCleaner) evictStaleCacheEntries() { + efc.mu.Lock() + defer efc.mu.Unlock() + + now := time.Now() + expiredCount := 0 + for folder, state := range efc.folderCounts { + // Skip if folder is in cleanup queue + if efc.cleanupQueue.Contains(folder) { + continue + } + + // Find the most recent activity time for this folder + lastActivity := state.lastCheck + if state.lastAddTime.After(lastActivity) { + lastActivity = state.lastAddTime + } + if state.lastDelTime.After(lastActivity) { + lastActivity = state.lastDelTime + } + + // Evict if no activity within cache expiry period + if now.Sub(lastActivity) > efc.cacheExpiry { + delete(efc.folderCounts, folder) + expiredCount++ + } + } + + if expiredCount > 0 { + glog.V(3).Infof("EmptyFolderCleaner: evicted %d stale cache entries", expiredCount) + } +} + +// Stop stops the cleaner and cancels all pending tasks +func (efc *EmptyFolderCleaner) Stop() { + close(efc.stopCh) + + efc.mu.Lock() + defer efc.mu.Unlock() + + efc.enabled = false + efc.cleanupQueue.Clear() + efc.folderCounts = make(map[string]*folderState) // Clear cache on stop +} + +// GetPendingCleanupCount returns the number of pending cleanup tasks (for testing) +func (efc *EmptyFolderCleaner) GetPendingCleanupCount() int { + return efc.cleanupQueue.Len() +} + +// GetCachedFolderCount returns the cached count for a folder (for testing) +func (efc *EmptyFolderCleaner) GetCachedFolderCount(folder string) (int, bool) { + efc.mu.RLock() + defer efc.mu.RUnlock() + if state, exists := efc.folderCounts[folder]; exists { + return state.roughCount, true + } + return 0, false +} + |
