aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/empty_folder_cleanup/cleanup_queue.go206
-rw-r--r--weed/filer/empty_folder_cleanup/cleanup_queue_test.go370
-rw-r--r--weed/filer/empty_folder_cleanup/empty_folder_cleaner.go436
-rw-r--r--weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go569
-rw-r--r--weed/filer/filer.go8
-rw-r--r--weed/filer/filer_notify.go39
-rw-r--r--weed/filer/filer_on_meta_event.go39
-rw-r--r--weed/filer/filer_search.go13
-rw-r--r--weed/s3api/s3api_object_handlers_delete.go57
9 files changed, 1685 insertions, 52 deletions
diff --git a/weed/filer/empty_folder_cleanup/cleanup_queue.go b/weed/filer/empty_folder_cleanup/cleanup_queue.go
new file mode 100644
index 000000000..66889e930
--- /dev/null
+++ b/weed/filer/empty_folder_cleanup/cleanup_queue.go
@@ -0,0 +1,206 @@
+package empty_folder_cleanup
+
+import (
+ "container/list"
+ "sync"
+ "time"
+)
+
+// CleanupQueue manages a deduplicated queue of folders pending cleanup.
+// It uses a doubly-linked list ordered by event time (oldest at front) and a map for O(1) deduplication.
+// Processing is triggered when:
+// - Queue size reaches maxSize, OR
+// - Oldest item exceeds maxAge
+type CleanupQueue struct {
+ mu sync.Mutex
+ items *list.List // Linked list of *queueItem ordered by time (front = oldest)
+ itemsMap map[string]*list.Element // folder -> list element for O(1) lookup
+ maxSize int // Max queue size before triggering cleanup
+ maxAge time.Duration // Max age before triggering cleanup
+}
+
+// queueItem represents an item in the cleanup queue
+type queueItem struct {
+ folder string
+ queueTime time.Time
+}
+
+// NewCleanupQueue creates a new CleanupQueue with the specified limits
+func NewCleanupQueue(maxSize int, maxAge time.Duration) *CleanupQueue {
+ return &CleanupQueue{
+ items: list.New(),
+ itemsMap: make(map[string]*list.Element),
+ maxSize: maxSize,
+ maxAge: maxAge,
+ }
+}
+
+// Add adds a folder to the queue with the specified event time.
+// The item is inserted in time-sorted order (oldest at front) to handle out-of-order events.
+// If folder already exists with an older time, the time is updated and position adjusted.
+// Returns true if the folder was newly added, false if it was updated.
+func (q *CleanupQueue) Add(folder string, eventTime time.Time) bool {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ // Check if folder already exists
+ if elem, exists := q.itemsMap[folder]; exists {
+ existingItem := elem.Value.(*queueItem)
+ // Only update if new event is later
+ if eventTime.After(existingItem.queueTime) {
+ // Remove from current position
+ q.items.Remove(elem)
+ // Re-insert with new time in sorted position
+ newElem := q.insertSorted(folder, eventTime)
+ q.itemsMap[folder] = newElem
+ }
+ return false
+ }
+
+ // Insert new folder in sorted position
+ elem := q.insertSorted(folder, eventTime)
+ q.itemsMap[folder] = elem
+ return true
+}
+
+// insertSorted inserts an item in the correct position to maintain time ordering (oldest at front)
+func (q *CleanupQueue) insertSorted(folder string, eventTime time.Time) *list.Element {
+ item := &queueItem{
+ folder: folder,
+ queueTime: eventTime,
+ }
+
+ // Find the correct position (insert before the first item with a later time)
+ for elem := q.items.Back(); elem != nil; elem = elem.Prev() {
+ existingItem := elem.Value.(*queueItem)
+ if !eventTime.Before(existingItem.queueTime) {
+ // Insert after this element
+ return q.items.InsertAfter(item, elem)
+ }
+ }
+
+ // This item is the oldest, insert at front
+ return q.items.PushFront(item)
+}
+
+// Remove removes a specific folder from the queue (e.g., when a file is created).
+// Returns true if the folder was found and removed.
+func (q *CleanupQueue) Remove(folder string) bool {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ elem, exists := q.itemsMap[folder]
+ if !exists {
+ return false
+ }
+
+ q.items.Remove(elem)
+ delete(q.itemsMap, folder)
+ return true
+}
+
+// ShouldProcess returns true if the queue should be processed.
+// This is true when:
+// - Queue size >= maxSize, OR
+// - Oldest item age > maxAge
+func (q *CleanupQueue) ShouldProcess() bool {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ return q.shouldProcessLocked()
+}
+
+// shouldProcessLocked checks if processing is needed (caller must hold lock)
+func (q *CleanupQueue) shouldProcessLocked() bool {
+ if q.items.Len() == 0 {
+ return false
+ }
+
+ // Check if queue is full
+ if q.items.Len() >= q.maxSize {
+ return true
+ }
+
+ // Check if oldest item exceeds max age
+ front := q.items.Front()
+ if front != nil {
+ item := front.Value.(*queueItem)
+ if time.Since(item.queueTime) > q.maxAge {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Pop removes and returns the oldest folder from the queue.
+// Returns the folder and true if an item was available, or empty string and false if queue is empty.
+func (q *CleanupQueue) Pop() (string, bool) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ front := q.items.Front()
+ if front == nil {
+ return "", false
+ }
+
+ item := front.Value.(*queueItem)
+ q.items.Remove(front)
+ delete(q.itemsMap, item.folder)
+
+ return item.folder, true
+}
+
+// Peek returns the oldest folder without removing it.
+// Returns the folder and queue time if available, or empty values if queue is empty.
+func (q *CleanupQueue) Peek() (folder string, queueTime time.Time, ok bool) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ front := q.items.Front()
+ if front == nil {
+ return "", time.Time{}, false
+ }
+
+ item := front.Value.(*queueItem)
+ return item.folder, item.queueTime, true
+}
+
+// Len returns the current queue size.
+func (q *CleanupQueue) Len() int {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ return q.items.Len()
+}
+
+// Contains checks if a folder is in the queue.
+func (q *CleanupQueue) Contains(folder string) bool {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ _, exists := q.itemsMap[folder]
+ return exists
+}
+
+// Clear removes all items from the queue.
+func (q *CleanupQueue) Clear() {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ q.items.Init()
+ q.itemsMap = make(map[string]*list.Element)
+}
+
+// OldestAge returns the age of the oldest item in the queue, or 0 if empty.
+func (q *CleanupQueue) OldestAge() time.Duration {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ front := q.items.Front()
+ if front == nil {
+ return 0
+ }
+
+ item := front.Value.(*queueItem)
+ return time.Since(item.queueTime)
+}
+
diff --git a/weed/filer/empty_folder_cleanup/cleanup_queue_test.go b/weed/filer/empty_folder_cleanup/cleanup_queue_test.go
new file mode 100644
index 000000000..eda1c3633
--- /dev/null
+++ b/weed/filer/empty_folder_cleanup/cleanup_queue_test.go
@@ -0,0 +1,370 @@
+package empty_folder_cleanup
+
+import (
+ "testing"
+ "time"
+)
+
+func TestCleanupQueue_Add(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ now := time.Now()
+
+ // Add first item
+ if !q.Add("/buckets/b1/folder1", now) {
+ t.Error("expected Add to return true for new item")
+ }
+ if q.Len() != 1 {
+ t.Errorf("expected len 1, got %d", q.Len())
+ }
+
+ // Add second item with later time
+ if !q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) {
+ t.Error("expected Add to return true for new item")
+ }
+ if q.Len() != 2 {
+ t.Errorf("expected len 2, got %d", q.Len())
+ }
+
+ // Add duplicate with newer time - should update and reposition
+ if q.Add("/buckets/b1/folder1", now.Add(2*time.Second)) {
+ t.Error("expected Add to return false for existing item")
+ }
+ if q.Len() != 2 {
+ t.Errorf("expected len 2 after duplicate, got %d", q.Len())
+ }
+
+ // folder1 should now be at the back (newer time) - verify by popping
+ folder1, _ := q.Pop()
+ folder2, _ := q.Pop()
+ if folder1 != "/buckets/b1/folder2" || folder2 != "/buckets/b1/folder1" {
+ t.Errorf("expected folder1 to be moved to back, got %s, %s", folder1, folder2)
+ }
+}
+
+func TestCleanupQueue_Add_OutOfOrder(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ baseTime := time.Now()
+
+ // Add items out of order
+ q.Add("/buckets/b1/folder3", baseTime.Add(3*time.Second))
+ q.Add("/buckets/b1/folder1", baseTime.Add(1*time.Second))
+ q.Add("/buckets/b1/folder2", baseTime.Add(2*time.Second))
+
+ // Items should be in time order (oldest first) - verify by popping
+ expected := []string{"/buckets/b1/folder1", "/buckets/b1/folder2", "/buckets/b1/folder3"}
+ for i, exp := range expected {
+ folder, ok := q.Pop()
+ if !ok || folder != exp {
+ t.Errorf("at index %d: expected %s, got %s", i, exp, folder)
+ }
+ }
+}
+
+func TestCleanupQueue_Add_DuplicateWithOlderTime(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ baseTime := time.Now()
+
+ // Add folder at t=5
+ q.Add("/buckets/b1/folder1", baseTime.Add(5*time.Second))
+
+ // Try to add same folder with older time - should NOT update
+ q.Add("/buckets/b1/folder1", baseTime.Add(2*time.Second))
+
+ // Time should remain at t=5
+ _, queueTime, _ := q.Peek()
+ if queueTime != baseTime.Add(5*time.Second) {
+ t.Errorf("expected time to remain unchanged, got %v", queueTime)
+ }
+}
+
+func TestCleanupQueue_Remove(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ now := time.Now()
+
+ q.Add("/buckets/b1/folder1", now)
+ q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
+ q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
+
+ // Remove middle item
+ if !q.Remove("/buckets/b1/folder2") {
+ t.Error("expected Remove to return true for existing item")
+ }
+ if q.Len() != 2 {
+ t.Errorf("expected len 2, got %d", q.Len())
+ }
+ if q.Contains("/buckets/b1/folder2") {
+ t.Error("removed item should not be in queue")
+ }
+
+ // Remove non-existent item
+ if q.Remove("/buckets/b1/nonexistent") {
+ t.Error("expected Remove to return false for non-existent item")
+ }
+
+ // Verify order is preserved by popping
+ folder1, _ := q.Pop()
+ folder3, _ := q.Pop()
+ if folder1 != "/buckets/b1/folder1" || folder3 != "/buckets/b1/folder3" {
+ t.Errorf("unexpected order: %s, %s", folder1, folder3)
+ }
+}
+
+func TestCleanupQueue_Pop(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ now := time.Now()
+
+ // Pop from empty queue
+ folder, ok := q.Pop()
+ if ok {
+ t.Error("expected Pop to return false for empty queue")
+ }
+ if folder != "" {
+ t.Errorf("expected empty folder, got %s", folder)
+ }
+
+ // Add items and pop in order
+ q.Add("/buckets/b1/folder1", now)
+ q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
+ q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
+
+ folder, ok = q.Pop()
+ if !ok || folder != "/buckets/b1/folder1" {
+ t.Errorf("expected folder1, got %s (ok=%v)", folder, ok)
+ }
+
+ folder, ok = q.Pop()
+ if !ok || folder != "/buckets/b1/folder2" {
+ t.Errorf("expected folder2, got %s (ok=%v)", folder, ok)
+ }
+
+ folder, ok = q.Pop()
+ if !ok || folder != "/buckets/b1/folder3" {
+ t.Errorf("expected folder3, got %s (ok=%v)", folder, ok)
+ }
+
+ // Queue should be empty now
+ if q.Len() != 0 {
+ t.Errorf("expected empty queue, got len %d", q.Len())
+ }
+}
+
+func TestCleanupQueue_Peek(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ now := time.Now()
+
+ // Peek empty queue
+ folder, _, ok := q.Peek()
+ if ok {
+ t.Error("expected Peek to return false for empty queue")
+ }
+
+ // Add item and peek
+ q.Add("/buckets/b1/folder1", now)
+ folder, queueTime, ok := q.Peek()
+ if !ok || folder != "/buckets/b1/folder1" {
+ t.Errorf("expected folder1, got %s (ok=%v)", folder, ok)
+ }
+ if queueTime != now {
+ t.Errorf("expected queue time %v, got %v", now, queueTime)
+ }
+
+ // Peek should not remove item
+ if q.Len() != 1 {
+ t.Errorf("Peek should not remove item, len=%d", q.Len())
+ }
+}
+
+func TestCleanupQueue_Contains(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ now := time.Now()
+
+ q.Add("/buckets/b1/folder1", now)
+
+ if !q.Contains("/buckets/b1/folder1") {
+ t.Error("expected Contains to return true")
+ }
+ if q.Contains("/buckets/b1/folder2") {
+ t.Error("expected Contains to return false for non-existent")
+ }
+}
+
+func TestCleanupQueue_ShouldProcess_MaxSize(t *testing.T) {
+ q := NewCleanupQueue(3, 10*time.Minute)
+ now := time.Now()
+
+ // Empty queue
+ if q.ShouldProcess() {
+ t.Error("empty queue should not need processing")
+ }
+
+ // Add items below max
+ q.Add("/buckets/b1/folder1", now)
+ q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
+ if q.ShouldProcess() {
+ t.Error("queue below max should not need processing")
+ }
+
+ // Add item to reach max
+ q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
+ if !q.ShouldProcess() {
+ t.Error("queue at max should need processing")
+ }
+}
+
+func TestCleanupQueue_ShouldProcess_MaxAge(t *testing.T) {
+ q := NewCleanupQueue(100, 100*time.Millisecond) // Short max age for testing
+
+ // Add item with old event time
+ oldTime := time.Now().Add(-1 * time.Second) // 1 second ago
+ q.Add("/buckets/b1/folder1", oldTime)
+
+ // Item is older than maxAge, should need processing
+ if !q.ShouldProcess() {
+ t.Error("old item should trigger processing")
+ }
+
+ // Clear and add fresh item
+ q.Clear()
+ q.Add("/buckets/b1/folder2", time.Now())
+
+ // Fresh item should not trigger processing
+ if q.ShouldProcess() {
+ t.Error("fresh item should not trigger processing")
+ }
+}
+
+func TestCleanupQueue_Clear(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ now := time.Now()
+
+ q.Add("/buckets/b1/folder1", now)
+ q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
+ q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
+
+ q.Clear()
+
+ if q.Len() != 0 {
+ t.Errorf("expected empty queue after Clear, got len %d", q.Len())
+ }
+ if q.Contains("/buckets/b1/folder1") {
+ t.Error("queue should not contain items after Clear")
+ }
+}
+
+func TestCleanupQueue_OldestAge(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+
+ // Empty queue
+ if q.OldestAge() != 0 {
+ t.Error("empty queue should have zero oldest age")
+ }
+
+ // Add item with time in the past
+ oldTime := time.Now().Add(-5 * time.Minute)
+ q.Add("/buckets/b1/folder1", oldTime)
+
+ // Age should be approximately 5 minutes
+ age := q.OldestAge()
+ if age < 4*time.Minute || age > 6*time.Minute {
+ t.Errorf("expected ~5m age, got %v", age)
+ }
+}
+
+func TestCleanupQueue_TimeOrder(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ baseTime := time.Now()
+
+ // Add items in order
+ items := []string{
+ "/buckets/b1/a",
+ "/buckets/b1/b",
+ "/buckets/b1/c",
+ "/buckets/b1/d",
+ "/buckets/b1/e",
+ }
+ for i, item := range items {
+ q.Add(item, baseTime.Add(time.Duration(i)*time.Second))
+ }
+
+ // Pop should return in time order
+ for i, expected := range items {
+ got, ok := q.Pop()
+ if !ok {
+ t.Errorf("Pop %d: expected item, got empty", i)
+ }
+ if got != expected {
+ t.Errorf("Pop %d: expected %s, got %s", i, expected, got)
+ }
+ }
+}
+
+func TestCleanupQueue_DuplicateWithNewerTime(t *testing.T) {
+ q := NewCleanupQueue(100, 10*time.Minute)
+ baseTime := time.Now()
+
+ // Add items
+ q.Add("/buckets/b1/folder1", baseTime)
+ q.Add("/buckets/b1/folder2", baseTime.Add(1*time.Second))
+ q.Add("/buckets/b1/folder3", baseTime.Add(2*time.Second))
+
+ // Add duplicate with newer time - should update and reposition
+ q.Add("/buckets/b1/folder1", baseTime.Add(3*time.Second))
+
+ // folder1 should now be at the back (newest time) - verify by popping
+ expected := []string{"/buckets/b1/folder2", "/buckets/b1/folder3", "/buckets/b1/folder1"}
+ for i, exp := range expected {
+ folder, ok := q.Pop()
+ if !ok || folder != exp {
+ t.Errorf("at index %d: expected %s, got %s", i, exp, folder)
+ }
+ }
+}
+
+func TestCleanupQueue_Concurrent(t *testing.T) {
+ q := NewCleanupQueue(1000, 10*time.Minute)
+ done := make(chan bool)
+ now := time.Now()
+
+ // Concurrent adds
+ go func() {
+ for i := 0; i < 100; i++ {
+ q.Add("/buckets/b1/folder"+string(rune('A'+i%26)), now.Add(time.Duration(i)*time.Millisecond))
+ }
+ done <- true
+ }()
+
+ // Concurrent removes
+ go func() {
+ for i := 0; i < 50; i++ {
+ q.Remove("/buckets/b1/folder" + string(rune('A'+i%26)))
+ }
+ done <- true
+ }()
+
+ // Concurrent pops
+ go func() {
+ for i := 0; i < 30; i++ {
+ q.Pop()
+ }
+ done <- true
+ }()
+
+ // Concurrent reads
+ go func() {
+ for i := 0; i < 100; i++ {
+ q.Len()
+ q.Contains("/buckets/b1/folderA")
+ q.ShouldProcess()
+ }
+ done <- true
+ }()
+
+ // Wait for all goroutines
+ for i := 0; i < 4; i++ {
+ <-done
+ }
+
+ // Just verify no panic occurred and queue is in consistent state
+ _ = q.Len()
+}
+
diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
new file mode 100644
index 000000000..70856aaf1
--- /dev/null
+++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
@@ -0,0 +1,436 @@
+package empty_folder_cleanup
+
+import (
+ "context"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+const (
+ DefaultMaxCountCheck = 1000
+ DefaultCacheExpiry = 5 * time.Minute
+ DefaultQueueMaxSize = 1000
+ DefaultQueueMaxAge = 10 * time.Minute
+ DefaultProcessorSleep = 10 * time.Second // How often to check queue
+)
+
+// FilerOperations defines the filer operations needed by EmptyFolderCleaner
+type FilerOperations interface {
+ CountDirectoryEntries(ctx context.Context, dirPath util.FullPath, limit int) (count int, err error)
+ DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32, ifNotModifiedAfter int64) error
+}
+
+// folderState tracks the state of a folder for empty folder cleanup
+type folderState struct {
+ roughCount int // Cached rough count (up to maxCountCheck)
+ lastAddTime time.Time // Last time an item was added
+ lastDelTime time.Time // Last time an item was deleted
+ lastCheck time.Time // Last time we checked the actual count
+}
+
+// EmptyFolderCleaner handles asynchronous cleanup of empty folders
+// Each filer owns specific folders via consistent hashing based on the peer filer list
+type EmptyFolderCleaner struct {
+ filer FilerOperations
+ lockRing *lock_manager.LockRing
+ host pb.ServerAddress
+
+ // Folder state tracking
+ mu sync.RWMutex
+ folderCounts map[string]*folderState // Rough count cache
+
+ // Cleanup queue (thread-safe, has its own lock)
+ cleanupQueue *CleanupQueue
+
+ // Configuration
+ maxCountCheck int // Max items to count (1000)
+ cacheExpiry time.Duration // How long to keep cache entries
+ processorSleep time.Duration // How often processor checks queue
+ bucketPath string // e.g., "/buckets"
+
+ // Control
+ enabled bool
+ stopCh chan struct{}
+}
+
+// NewEmptyFolderCleaner creates a new EmptyFolderCleaner
+func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string) *EmptyFolderCleaner {
+ efc := &EmptyFolderCleaner{
+ filer: filer,
+ lockRing: lockRing,
+ host: host,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge),
+ maxCountCheck: DefaultMaxCountCheck,
+ cacheExpiry: DefaultCacheExpiry,
+ processorSleep: DefaultProcessorSleep,
+ bucketPath: bucketPath,
+ enabled: true,
+ stopCh: make(chan struct{}),
+ }
+ go efc.cacheEvictionLoop()
+ go efc.cleanupProcessor()
+ return efc
+}
+
+// SetEnabled enables or disables the cleaner
+func (efc *EmptyFolderCleaner) SetEnabled(enabled bool) {
+ efc.mu.Lock()
+ defer efc.mu.Unlock()
+ efc.enabled = enabled
+}
+
+// IsEnabled returns whether the cleaner is enabled
+func (efc *EmptyFolderCleaner) IsEnabled() bool {
+ efc.mu.RLock()
+ defer efc.mu.RUnlock()
+ return efc.enabled
+}
+
+// ownsFolder checks if this filer owns the folder via consistent hashing
+func (efc *EmptyFolderCleaner) ownsFolder(folder string) bool {
+ servers := efc.lockRing.GetSnapshot()
+ if len(servers) <= 1 {
+ return true // Single filer case
+ }
+ return efc.hashKeyToServer(folder, servers) == efc.host
+}
+
+// hashKeyToServer uses consistent hashing to map a folder to a server
+func (efc *EmptyFolderCleaner) hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
+ if len(servers) == 0 {
+ return ""
+ }
+ x := util.HashStringToLong(key)
+ if x < 0 {
+ x = -x
+ }
+ x = x % int64(len(servers))
+ return servers[x]
+}
+
+// OnDeleteEvent is called when a file or directory is deleted
+// Both file and directory deletions count towards making the parent folder empty
+// eventTime is the time when the delete event occurred (for proper ordering)
+func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, isDirectory bool, eventTime time.Time) {
+ // Skip if not under bucket path (must be at least /buckets/<bucket>/...)
+ if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) {
+ return
+ }
+
+ // Check if we own this folder
+ if !efc.ownsFolder(directory) {
+ glog.V(4).Infof("EmptyFolderCleaner: not owner of %s, skipping", directory)
+ return
+ }
+
+ efc.mu.Lock()
+ defer efc.mu.Unlock()
+
+ // Check enabled inside lock to avoid race with Stop()
+ if !efc.enabled {
+ return
+ }
+
+ glog.V(3).Infof("EmptyFolderCleaner: delete event in %s/%s (isDir=%v)", directory, entryName, isDirectory)
+
+ // Update cached count (create entry if needed)
+ state, exists := efc.folderCounts[directory]
+ if !exists {
+ state = &folderState{}
+ efc.folderCounts[directory] = state
+ }
+ if state.roughCount > 0 {
+ state.roughCount--
+ }
+ state.lastDelTime = eventTime
+
+ // Only add to cleanup queue if roughCount suggests folder might be empty
+ if state.roughCount > 0 {
+ glog.V(3).Infof("EmptyFolderCleaner: skipping queue for %s, roughCount=%d", directory, state.roughCount)
+ return
+ }
+
+ // Add to cleanup queue with event time (handles out-of-order events)
+ if efc.cleanupQueue.Add(directory, eventTime) {
+ glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory)
+ }
+}
+
+// OnCreateEvent is called when a file or directory is created
+// Both file and directory creations cancel pending cleanup for the parent folder
+func (efc *EmptyFolderCleaner) OnCreateEvent(directory string, entryName string, isDirectory bool) {
+ // Skip if not under bucket path (must be at least /buckets/<bucket>/...)
+ if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) {
+ return
+ }
+
+ efc.mu.Lock()
+ defer efc.mu.Unlock()
+
+ // Check enabled inside lock to avoid race with Stop()
+ if !efc.enabled {
+ return
+ }
+
+ // Update cached count only if already tracked (no need to track new folders)
+ if state, exists := efc.folderCounts[directory]; exists {
+ state.roughCount++
+ state.lastAddTime = time.Now()
+ }
+
+ // Remove from cleanup queue (cancel pending cleanup)
+ if efc.cleanupQueue.Remove(directory) {
+ glog.V(3).Infof("EmptyFolderCleaner: cancelled cleanup for %s due to new entry", directory)
+ }
+}
+
+// cleanupProcessor runs in background and processes the cleanup queue
+func (efc *EmptyFolderCleaner) cleanupProcessor() {
+ ticker := time.NewTicker(efc.processorSleep)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-efc.stopCh:
+ return
+ case <-ticker.C:
+ efc.processCleanupQueue()
+ }
+ }
+}
+
+// processCleanupQueue processes items from the cleanup queue
+func (efc *EmptyFolderCleaner) processCleanupQueue() {
+ // Check if we should process
+ if !efc.cleanupQueue.ShouldProcess() {
+ return
+ }
+
+ glog.V(3).Infof("EmptyFolderCleaner: processing cleanup queue (len=%d, age=%v)",
+ efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge())
+
+ // Process all items that are ready
+ for efc.cleanupQueue.Len() > 0 {
+ // Check if still enabled
+ if !efc.IsEnabled() {
+ return
+ }
+
+ // Pop the oldest item
+ folder, ok := efc.cleanupQueue.Pop()
+ if !ok {
+ break
+ }
+
+ // Execute cleanup for this folder
+ efc.executeCleanup(folder)
+
+ // If queue is no longer full and oldest item is not old enough, stop processing
+ if !efc.cleanupQueue.ShouldProcess() {
+ break
+ }
+ }
+}
+
+// executeCleanup performs the actual cleanup of an empty folder
+func (efc *EmptyFolderCleaner) executeCleanup(folder string) {
+ efc.mu.Lock()
+
+ // Quick check: if we have cached count and it's > 0, skip
+ if state, exists := efc.folderCounts[folder]; exists {
+ if state.roughCount > 0 {
+ glog.V(3).Infof("EmptyFolderCleaner: skipping %s, cached count=%d", folder, state.roughCount)
+ efc.mu.Unlock()
+ return
+ }
+ // If there was an add after our delete, skip
+ if !state.lastAddTime.IsZero() && state.lastAddTime.After(state.lastDelTime) {
+ glog.V(3).Infof("EmptyFolderCleaner: skipping %s, add happened after delete", folder)
+ efc.mu.Unlock()
+ return
+ }
+ }
+ efc.mu.Unlock()
+
+ // Re-check ownership (topology might have changed)
+ if !efc.ownsFolder(folder) {
+ glog.V(3).Infof("EmptyFolderCleaner: no longer owner of %s, skipping", folder)
+ return
+ }
+
+ // Check if folder is actually empty (count up to maxCountCheck)
+ ctx := context.Background()
+ count, err := efc.countItems(ctx, folder)
+ if err != nil {
+ glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err)
+ return
+ }
+
+ efc.mu.Lock()
+ // Update cache
+ if _, exists := efc.folderCounts[folder]; !exists {
+ efc.folderCounts[folder] = &folderState{}
+ }
+ efc.folderCounts[folder].roughCount = count
+ efc.folderCounts[folder].lastCheck = time.Now()
+ efc.mu.Unlock()
+
+ if count > 0 {
+ glog.V(3).Infof("EmptyFolderCleaner: folder %s has %d items, not empty", folder, count)
+ return
+ }
+
+ // Delete the empty folder
+ glog.V(2).Infof("EmptyFolderCleaner: deleting empty folder %s", folder)
+ if err := efc.deleteFolder(ctx, folder); err != nil {
+ glog.V(2).Infof("EmptyFolderCleaner: failed to delete empty folder %s: %v", folder, err)
+ return
+ }
+
+ // Clean up cache entry
+ efc.mu.Lock()
+ delete(efc.folderCounts, folder)
+ efc.mu.Unlock()
+
+ // Note: No need to recursively check parent folder here.
+ // The deletion of this folder will generate a metadata event,
+ // which will trigger OnDeleteEvent for the parent folder.
+}
+
+// countItems counts items in a folder (up to maxCountCheck)
+func (efc *EmptyFolderCleaner) countItems(ctx context.Context, folder string) (int, error) {
+ return efc.filer.CountDirectoryEntries(ctx, util.FullPath(folder), efc.maxCountCheck)
+}
+
+// deleteFolder deletes an empty folder
+func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string) error {
+ return efc.filer.DeleteEntryMetaAndData(ctx, util.FullPath(folder), false, false, false, false, nil, 0)
+}
+
+// isUnderPath checks if child is under parent path
+func isUnderPath(child, parent string) bool {
+ if parent == "" || parent == "/" {
+ return true
+ }
+ // Ensure parent ends without slash for proper prefix matching
+ if len(parent) > 0 && parent[len(parent)-1] == '/' {
+ parent = parent[:len(parent)-1]
+ }
+ // Child must start with parent and then have a / or be exactly parent
+ if len(child) < len(parent) {
+ return false
+ }
+ if child[:len(parent)] != parent {
+ return false
+ }
+ if len(child) == len(parent) {
+ return true
+ }
+ return child[len(parent)] == '/'
+}
+
+// isUnderBucketPath checks if directory is inside a bucket (under /buckets/<bucket>/...)
+// This ensures we only clean up folders inside buckets, not the buckets themselves
+func isUnderBucketPath(directory, bucketPath string) bool {
+ if bucketPath == "" {
+ return true
+ }
+ // Ensure bucketPath ends without slash
+ if len(bucketPath) > 0 && bucketPath[len(bucketPath)-1] == '/' {
+ bucketPath = bucketPath[:len(bucketPath)-1]
+ }
+ // Directory must be under bucketPath
+ if !isUnderPath(directory, bucketPath) {
+ return false
+ }
+ // Directory must be at least /buckets/<bucket>/<something>
+ // i.e., depth must be at least bucketPath depth + 2
+ // For /buckets (depth 1), we need at least /buckets/mybucket/folder (depth 3)
+ bucketPathDepth := strings.Count(bucketPath, "/")
+ directoryDepth := strings.Count(directory, "/")
+ return directoryDepth >= bucketPathDepth+2
+}
+
+// cacheEvictionLoop periodically removes stale entries from folderCounts
+func (efc *EmptyFolderCleaner) cacheEvictionLoop() {
+ ticker := time.NewTicker(efc.cacheExpiry)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-efc.stopCh:
+ return
+ case <-ticker.C:
+ efc.evictStaleCacheEntries()
+ }
+ }
+}
+
+// evictStaleCacheEntries removes cache entries that haven't been accessed recently
+func (efc *EmptyFolderCleaner) evictStaleCacheEntries() {
+ efc.mu.Lock()
+ defer efc.mu.Unlock()
+
+ now := time.Now()
+ expiredCount := 0
+ for folder, state := range efc.folderCounts {
+ // Skip if folder is in cleanup queue
+ if efc.cleanupQueue.Contains(folder) {
+ continue
+ }
+
+ // Find the most recent activity time for this folder
+ lastActivity := state.lastCheck
+ if state.lastAddTime.After(lastActivity) {
+ lastActivity = state.lastAddTime
+ }
+ if state.lastDelTime.After(lastActivity) {
+ lastActivity = state.lastDelTime
+ }
+
+ // Evict if no activity within cache expiry period
+ if now.Sub(lastActivity) > efc.cacheExpiry {
+ delete(efc.folderCounts, folder)
+ expiredCount++
+ }
+ }
+
+ if expiredCount > 0 {
+ glog.V(3).Infof("EmptyFolderCleaner: evicted %d stale cache entries", expiredCount)
+ }
+}
+
+// Stop stops the cleaner and cancels all pending tasks
+func (efc *EmptyFolderCleaner) Stop() {
+ close(efc.stopCh)
+
+ efc.mu.Lock()
+ defer efc.mu.Unlock()
+
+ efc.enabled = false
+ efc.cleanupQueue.Clear()
+ efc.folderCounts = make(map[string]*folderState) // Clear cache on stop
+}
+
+// GetPendingCleanupCount returns the number of pending cleanup tasks (for testing)
+func (efc *EmptyFolderCleaner) GetPendingCleanupCount() int {
+ return efc.cleanupQueue.Len()
+}
+
+// GetCachedFolderCount returns the cached count for a folder (for testing)
+func (efc *EmptyFolderCleaner) GetCachedFolderCount(folder string) (int, bool) {
+ efc.mu.RLock()
+ defer efc.mu.RUnlock()
+ if state, exists := efc.folderCounts[folder]; exists {
+ return state.roughCount, true
+ }
+ return 0, false
+}
+
diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
new file mode 100644
index 000000000..fbc05ccf8
--- /dev/null
+++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
@@ -0,0 +1,569 @@
+package empty_folder_cleanup
+
+import (
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+)
+
+func Test_isUnderPath(t *testing.T) {
+ tests := []struct {
+ name string
+ child string
+ parent string
+ expected bool
+ }{
+ {"child under parent", "/buckets/mybucket/folder/file.txt", "/buckets", true},
+ {"child is parent", "/buckets", "/buckets", true},
+ {"child not under parent", "/other/path", "/buckets", false},
+ {"empty parent", "/any/path", "", true},
+ {"root parent", "/any/path", "/", true},
+ {"parent with trailing slash", "/buckets/mybucket", "/buckets/", true},
+ {"similar prefix but not under", "/buckets-other/file", "/buckets", false},
+ {"deeply nested", "/buckets/a/b/c/d/e/f", "/buckets/a/b", true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := isUnderPath(tt.child, tt.parent)
+ if result != tt.expected {
+ t.Errorf("isUnderPath(%q, %q) = %v, want %v", tt.child, tt.parent, result, tt.expected)
+ }
+ })
+ }
+}
+
+func Test_isUnderBucketPath(t *testing.T) {
+ tests := []struct {
+ name string
+ directory string
+ bucketPath string
+ expected bool
+ }{
+ // Should NOT process - bucket path itself
+ {"bucket path itself", "/buckets", "/buckets", false},
+ // Should NOT process - bucket directory (immediate child)
+ {"bucket directory", "/buckets/mybucket", "/buckets", false},
+ // Should process - folder inside bucket
+ {"folder in bucket", "/buckets/mybucket/folder", "/buckets", true},
+ // Should process - nested folder
+ {"nested folder", "/buckets/mybucket/a/b/c", "/buckets", true},
+ // Should NOT process - outside buckets
+ {"outside buckets", "/other/path", "/buckets", false},
+ // Empty bucket path allows all
+ {"empty bucket path", "/any/path", "", true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := isUnderBucketPath(tt.directory, tt.bucketPath)
+ if result != tt.expected {
+ t.Errorf("isUnderBucketPath(%q, %q) = %v, want %v", tt.directory, tt.bucketPath, result, tt.expected)
+ }
+ })
+ }
+}
+
+func TestEmptyFolderCleaner_ownsFolder(t *testing.T) {
+ // Create a LockRing with multiple servers
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+
+ servers := []pb.ServerAddress{
+ "filer1:8888",
+ "filer2:8888",
+ "filer3:8888",
+ }
+ lockRing.SetSnapshot(servers)
+
+ // Create cleaner for filer1
+ cleaner1 := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ }
+
+ // Create cleaner for filer2
+ cleaner2 := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer2:8888",
+ }
+
+ // Create cleaner for filer3
+ cleaner3 := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer3:8888",
+ }
+
+ // Test that exactly one filer owns each folder
+ testFolders := []string{
+ "/buckets/mybucket/folder1",
+ "/buckets/mybucket/folder2",
+ "/buckets/mybucket/folder3",
+ "/buckets/mybucket/a/b/c",
+ "/buckets/otherbucket/x",
+ }
+
+ for _, folder := range testFolders {
+ ownCount := 0
+ if cleaner1.ownsFolder(folder) {
+ ownCount++
+ }
+ if cleaner2.ownsFolder(folder) {
+ ownCount++
+ }
+ if cleaner3.ownsFolder(folder) {
+ ownCount++
+ }
+
+ if ownCount != 1 {
+ t.Errorf("folder %q owned by %d filers, expected exactly 1", folder, ownCount)
+ }
+ }
+}
+
+func TestEmptyFolderCleaner_ownsFolder_singleServer(t *testing.T) {
+ // Create a LockRing with a single server
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ }
+
+ // Single filer should own all folders
+ testFolders := []string{
+ "/buckets/mybucket/folder1",
+ "/buckets/mybucket/folder2",
+ "/buckets/otherbucket/x",
+ }
+
+ for _, folder := range testFolders {
+ if !cleaner.ownsFolder(folder) {
+ t.Errorf("single filer should own folder %q", folder)
+ }
+ }
+}
+
+func TestEmptyFolderCleaner_ownsFolder_emptyRing(t *testing.T) {
+ // Create an empty LockRing
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ }
+
+ // With empty ring, should own all folders
+ if !cleaner.ownsFolder("/buckets/mybucket/folder") {
+ t.Error("should own folder with empty ring")
+ }
+}
+
+func TestEmptyFolderCleaner_OnCreateEvent_cancelsCleanup(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ folder := "/buckets/mybucket/testfolder"
+ now := time.Now()
+
+ // Simulate delete event
+ cleaner.OnDeleteEvent(folder, "file.txt", false, now)
+
+ // Check that cleanup is queued
+ if cleaner.GetPendingCleanupCount() != 1 {
+ t.Errorf("expected 1 pending cleanup, got %d", cleaner.GetPendingCleanupCount())
+ }
+
+ // Simulate create event
+ cleaner.OnCreateEvent(folder, "newfile.txt", false)
+
+ // Check that cleanup is cancelled
+ if cleaner.GetPendingCleanupCount() != 0 {
+ t.Errorf("expected 0 pending cleanups after create, got %d", cleaner.GetPendingCleanupCount())
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_OnDeleteEvent_deduplication(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ folder := "/buckets/mybucket/testfolder"
+ now := time.Now()
+
+ // Simulate multiple delete events for same folder
+ for i := 0; i < 5; i++ {
+ cleaner.OnDeleteEvent(folder, "file"+string(rune('0'+i))+".txt", false, now.Add(time.Duration(i)*time.Second))
+ }
+
+ // Check that only 1 cleanup is queued (deduplicated)
+ if cleaner.GetPendingCleanupCount() != 1 {
+ t.Errorf("expected 1 pending cleanup after deduplication, got %d", cleaner.GetPendingCleanupCount())
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_OnDeleteEvent_multipleFolders(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ now := time.Now()
+
+ // Delete files in different folders
+ cleaner.OnDeleteEvent("/buckets/mybucket/folder1", "file.txt", false, now)
+ cleaner.OnDeleteEvent("/buckets/mybucket/folder2", "file.txt", false, now.Add(1*time.Second))
+ cleaner.OnDeleteEvent("/buckets/mybucket/folder3", "file.txt", false, now.Add(2*time.Second))
+
+ // Each folder should be queued
+ if cleaner.GetPendingCleanupCount() != 3 {
+ t.Errorf("expected 3 pending cleanups, got %d", cleaner.GetPendingCleanupCount())
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_OnDeleteEvent_notOwner(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888", "filer2:8888"})
+
+ // Create cleaner for filer that doesn't own the folder
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ now := time.Now()
+
+ // Try many folders, looking for one that filer1 doesn't own
+ foundNonOwned := false
+ for i := 0; i < 100; i++ {
+ folder := "/buckets/mybucket/folder" + string(rune('0'+i%10)) + string(rune('0'+i/10))
+ if !cleaner.ownsFolder(folder) {
+ // This folder is not owned by filer1
+ cleaner.OnDeleteEvent(folder, "file.txt", false, now)
+ if cleaner.GetPendingCleanupCount() != 0 {
+ t.Errorf("non-owner should not queue cleanup for folder %s", folder)
+ }
+ foundNonOwned = true
+ break
+ }
+ }
+
+ if !foundNonOwned {
+ t.Skip("could not find a folder not owned by filer1")
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_OnDeleteEvent_disabled(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: false, // Disabled
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ folder := "/buckets/mybucket/testfolder"
+ now := time.Now()
+
+ // Simulate delete event
+ cleaner.OnDeleteEvent(folder, "file.txt", false, now)
+
+ // Check that no cleanup is queued when disabled
+ if cleaner.GetPendingCleanupCount() != 0 {
+ t.Errorf("disabled cleaner should not queue cleanup, got %d", cleaner.GetPendingCleanupCount())
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_OnDeleteEvent_directoryDeletion(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ folder := "/buckets/mybucket/testfolder"
+ now := time.Now()
+
+ // Simulate directory delete event - should trigger cleanup
+ // because subdirectory deletion also makes parent potentially empty
+ cleaner.OnDeleteEvent(folder, "subdir", true, now)
+
+ // Check that cleanup IS queued for directory deletion
+ if cleaner.GetPendingCleanupCount() != 1 {
+ t.Errorf("directory deletion should trigger cleanup, got %d", cleaner.GetPendingCleanupCount())
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_cachedCounts(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ folder := "/buckets/mybucket/testfolder"
+
+ // Initialize cached count
+ cleaner.folderCounts[folder] = &folderState{roughCount: 5}
+
+ // Simulate create events
+ cleaner.OnCreateEvent(folder, "newfile1.txt", false)
+ cleaner.OnCreateEvent(folder, "newfile2.txt", false)
+
+ // Check cached count increased
+ count, exists := cleaner.GetCachedFolderCount(folder)
+ if !exists {
+ t.Error("cached folder count should exist")
+ }
+ if count != 7 {
+ t.Errorf("expected cached count 7, got %d", count)
+ }
+
+ // Simulate delete events
+ now := time.Now()
+ cleaner.OnDeleteEvent(folder, "file1.txt", false, now)
+ cleaner.OnDeleteEvent(folder, "file2.txt", false, now.Add(1*time.Second))
+
+ // Check cached count decreased
+ count, exists = cleaner.GetCachedFolderCount(folder)
+ if !exists {
+ t.Error("cached folder count should exist")
+ }
+ if count != 5 {
+ t.Errorf("expected cached count 5, got %d", count)
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_Stop(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ now := time.Now()
+
+ // Queue some cleanups
+ cleaner.OnDeleteEvent("/buckets/mybucket/folder1", "file1.txt", false, now)
+ cleaner.OnDeleteEvent("/buckets/mybucket/folder2", "file2.txt", false, now.Add(1*time.Second))
+ cleaner.OnDeleteEvent("/buckets/mybucket/folder3", "file3.txt", false, now.Add(2*time.Second))
+
+ // Verify cleanups are queued
+ if cleaner.GetPendingCleanupCount() < 1 {
+ t.Error("expected at least 1 pending cleanup before stop")
+ }
+
+ // Stop the cleaner
+ cleaner.Stop()
+
+ // Verify all cleanups are cancelled
+ if cleaner.GetPendingCleanupCount() != 0 {
+ t.Errorf("expected 0 pending cleanups after stop, got %d", cleaner.GetPendingCleanupCount())
+ }
+}
+
+func TestEmptyFolderCleaner_cacheEviction(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ cacheExpiry: 100 * time.Millisecond, // Short expiry for testing
+ stopCh: make(chan struct{}),
+ }
+
+ folder1 := "/buckets/mybucket/folder1"
+ folder2 := "/buckets/mybucket/folder2"
+ folder3 := "/buckets/mybucket/folder3"
+
+ // Add some cache entries with old timestamps
+ oldTime := time.Now().Add(-1 * time.Hour)
+ cleaner.folderCounts[folder1] = &folderState{roughCount: 5, lastCheck: oldTime}
+ cleaner.folderCounts[folder2] = &folderState{roughCount: 3, lastCheck: oldTime}
+ // folder3 has recent activity
+ cleaner.folderCounts[folder3] = &folderState{roughCount: 2, lastCheck: time.Now()}
+
+ // Verify all entries exist
+ if len(cleaner.folderCounts) != 3 {
+ t.Errorf("expected 3 cache entries, got %d", len(cleaner.folderCounts))
+ }
+
+ // Run eviction
+ cleaner.evictStaleCacheEntries()
+
+ // Verify stale entries are evicted
+ if len(cleaner.folderCounts) != 1 {
+ t.Errorf("expected 1 cache entry after eviction, got %d", len(cleaner.folderCounts))
+ }
+
+ // Verify the recent entry still exists
+ if _, exists := cleaner.folderCounts[folder3]; !exists {
+ t.Error("expected folder3 to still exist in cache")
+ }
+
+ // Verify stale entries are removed
+ if _, exists := cleaner.folderCounts[folder1]; exists {
+ t.Error("expected folder1 to be evicted")
+ }
+ if _, exists := cleaner.folderCounts[folder2]; exists {
+ t.Error("expected folder2 to be evicted")
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_cacheEviction_skipsEntriesInQueue(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ cacheExpiry: 100 * time.Millisecond,
+ stopCh: make(chan struct{}),
+ }
+
+ folder := "/buckets/mybucket/folder"
+ oldTime := time.Now().Add(-1 * time.Hour)
+
+ // Add a stale cache entry
+ cleaner.folderCounts[folder] = &folderState{roughCount: 0, lastCheck: oldTime}
+ // Also add to cleanup queue
+ cleaner.cleanupQueue.Add(folder, time.Now())
+
+ // Run eviction
+ cleaner.evictStaleCacheEntries()
+
+ // Verify entry is NOT evicted because it's in cleanup queue
+ if _, exists := cleaner.folderCounts[folder]; !exists {
+ t.Error("expected folder to still exist in cache (is in cleanup queue)")
+ }
+
+ cleaner.Stop()
+}
+
+func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) {
+ lockRing := lock_manager.NewLockRing(5 * time.Second)
+ lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
+
+ cleaner := &EmptyFolderCleaner{
+ lockRing: lockRing,
+ host: "filer1:8888",
+ bucketPath: "/buckets",
+ enabled: true,
+ folderCounts: make(map[string]*folderState),
+ cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
+ stopCh: make(chan struct{}),
+ }
+
+ now := time.Now()
+
+ // Add folders in order
+ folders := []string{
+ "/buckets/mybucket/folder1",
+ "/buckets/mybucket/folder2",
+ "/buckets/mybucket/folder3",
+ }
+ for i, folder := range folders {
+ cleaner.OnDeleteEvent(folder, "file.txt", false, now.Add(time.Duration(i)*time.Second))
+ }
+
+ // Verify queue length
+ if cleaner.GetPendingCleanupCount() != 3 {
+ t.Errorf("expected 3 queued folders, got %d", cleaner.GetPendingCleanupCount())
+ }
+
+ // Verify time-sorted order by popping
+ for i, expected := range folders {
+ folder, ok := cleaner.cleanupQueue.Pop()
+ if !ok || folder != expected {
+ t.Errorf("expected folder %s at index %d, got %s", expected, i, folder)
+ }
+ }
+
+ cleaner.Stop()
+}
+
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index f9f3d4fb2..382eb644f 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
+ "github.com/seaweedfs/seaweedfs/weed/filer/empty_folder_cleanup"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -56,6 +57,7 @@ type Filer struct {
MaxFilenameLength uint32
deletionQuit chan struct{}
DeletionRetryQueue *DeletionRetryQueue
+ EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner
}
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer {
@@ -116,6 +118,9 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste
f.Dlm.LockRing.SetSnapshot(snapshot)
glog.V(0).Infof("%s aggregate from peers %+v", self, snapshot)
+ // Initialize the empty folder cleaner using the same LockRing as Dlm for consistent hashing
+ f.EmptyFolderCleaner = empty_folder_cleanup.NewEmptyFolderCleaner(f, f.Dlm.LockRing, self, f.DirBucketsPath)
+
f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
if update.NodeType != cluster.FilerType {
@@ -506,6 +511,9 @@ func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bo
func (f *Filer) Shutdown() {
close(f.deletionQuit)
+ if f.EmptyFolderCleaner != nil {
+ f.EmptyFolderCleaner.Stop()
+ }
f.LocalMetaLogBuffer.ShutdownLogBuffer()
f.Store.Shutdown()
}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 845a0678e..45c9b070f 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -66,6 +66,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
f.logMetaEvent(ctx, fullpath, eventNotification)
+ // Trigger empty folder cleanup for local events
+ // Remote events are handled via MetaAggregator.onMetadataChangeEvent
+ f.triggerLocalEmptyFolderCleanup(oldEntry, newEntry)
+
}
func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotification *filer_pb.EventNotification) {
@@ -89,6 +93,41 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
}
+// triggerLocalEmptyFolderCleanup triggers empty folder cleanup for local events
+// This is needed because onMetadataChangeEvent is only called for remote peer events
+func (f *Filer) triggerLocalEmptyFolderCleanup(oldEntry, newEntry *Entry) {
+ if f.EmptyFolderCleaner == nil || !f.EmptyFolderCleaner.IsEnabled() {
+ return
+ }
+
+ eventTime := time.Now()
+
+ // Handle delete events (oldEntry exists, newEntry is nil)
+ if oldEntry != nil && newEntry == nil {
+ dir, name := oldEntry.FullPath.DirAndName()
+ f.EmptyFolderCleaner.OnDeleteEvent(dir, name, oldEntry.IsDirectory(), eventTime)
+ }
+
+ // Handle create events (oldEntry is nil, newEntry exists)
+ if oldEntry == nil && newEntry != nil {
+ dir, name := newEntry.FullPath.DirAndName()
+ f.EmptyFolderCleaner.OnCreateEvent(dir, name, newEntry.IsDirectory())
+ }
+
+ // Handle rename/move events (both exist but paths differ)
+ if oldEntry != nil && newEntry != nil {
+ oldDir, oldName := oldEntry.FullPath.DirAndName()
+ newDir, newName := newEntry.FullPath.DirAndName()
+
+ if oldDir != newDir || oldName != newName {
+ // Treat old location as delete
+ f.EmptyFolderCleaner.OnDeleteEvent(oldDir, oldName, oldEntry.IsDirectory(), eventTime)
+ // Treat new location as create
+ f.EmptyFolderCleaner.OnCreateEvent(newDir, newName, newEntry.IsDirectory())
+ }
+ }
+}
+
func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
if len(buf) == 0 {
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index acbf4aa47..4ee80b3a6 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -2,6 +2,7 @@ package filer
import (
"bytes"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -13,6 +14,7 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse)
f.maybeReloadFilerConfiguration(event)
f.maybeReloadRemoteStorageConfigurationAndMapping(event)
f.onBucketEvents(event)
+ f.onEmptyFolderCleanupEvents(event)
}
func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
@@ -32,6 +34,43 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
}
}
+// onEmptyFolderCleanupEvents handles create/delete events for empty folder cleanup
+func (f *Filer) onEmptyFolderCleanupEvents(event *filer_pb.SubscribeMetadataResponse) {
+ if f.EmptyFolderCleaner == nil || !f.EmptyFolderCleaner.IsEnabled() {
+ return
+ }
+
+ message := event.EventNotification
+ directory := event.Directory
+ eventTime := time.Unix(0, event.TsNs)
+
+ // Handle delete events - trigger folder cleanup check
+ if filer_pb.IsDelete(event) && message.OldEntry != nil {
+ f.EmptyFolderCleaner.OnDeleteEvent(directory, message.OldEntry.Name, message.OldEntry.IsDirectory, eventTime)
+ }
+
+ // Handle create events - cancel pending cleanup for the folder
+ if filer_pb.IsCreate(event) && message.NewEntry != nil {
+ f.EmptyFolderCleaner.OnCreateEvent(directory, message.NewEntry.Name, message.NewEntry.IsDirectory)
+ }
+
+ // Handle rename/move events
+ if filer_pb.IsRename(event) {
+ // Treat the old location as a delete
+ if message.OldEntry != nil {
+ f.EmptyFolderCleaner.OnDeleteEvent(directory, message.OldEntry.Name, message.OldEntry.IsDirectory, eventTime)
+ }
+ // Treat the new location as a create
+ if message.NewEntry != nil {
+ newDir := message.NewParentPath
+ if newDir == "" {
+ newDir = directory
+ }
+ f.EmptyFolderCleaner.OnCreateEvent(newDir, message.NewEntry.Name, message.NewEntry.IsDirectory)
+ }
+ }
+}
+
func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) {
if DirectoryEtcSeaweedFS != event.Directory {
if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath {
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 294fc0e7f..e6366e82f 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -41,6 +41,19 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
return entries, hasMore, err
}
+// CountDirectoryEntries counts entries in a directory up to limit
+func (f *Filer) CountDirectoryEntries(ctx context.Context, p util.FullPath, limit int) (count int, err error) {
+ entries, hasMore, err := f.ListDirectoryEntries(ctx, p, "", false, int64(limit), "", "", "")
+ if err != nil {
+ return 0, err
+ }
+ count = len(entries)
+ if hasMore {
+ count = limit // At least this many
+ }
+ return count, nil
+}
+
// For now, prefix and namePattern are mutually exclusive
func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go
index f779a6edc..6e373bb4e 100644
--- a/weed/s3api/s3api_object_handlers_delete.go
+++ b/weed/s3api/s3api_object_handlers_delete.go
@@ -1,12 +1,10 @@
package s3api
import (
- "context"
"encoding/xml"
"fmt"
"io"
"net/http"
- "slices"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -127,22 +125,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // Use operation context that won't be cancelled if request terminates
- // This ensures deletion completes atomically to avoid inconsistent state
- opCtx := context.WithoutCancel(r.Context())
-
- if err := doDeleteEntry(client, dir, name, true, false); err != nil {
- return err
- }
-
- // Cleanup empty directories
- if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
- bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
- // Recursively delete empty parent directories, stop at bucket path
- filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil)
- }
-
- return nil
+ return doDeleteEntry(client, dir, name, true, false)
+ // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
+ // which listens to metadata events and uses consistent hashing for coordination
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -222,8 +207,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
- directoriesWithDeletion := make(map[string]bool)
-
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
@@ -245,10 +228,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
versioningConfigured := (versioningState != "")
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // Use operation context that won't be cancelled if request terminates
- // This ensures batch deletion completes atomically to avoid inconsistent state
- opCtx := context.WithoutCancel(r.Context())
-
// delete file entries
for _, object := range deleteObjects.Objects {
if object.Key == "" {
@@ -357,10 +336,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
- // Track directory for empty directory cleanup
- if !s3a.option.AllowEmptyFolder {
- directoriesWithDeletion[parentDirectoryPath] = true
- }
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
@@ -380,30 +355,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
- // Cleanup empty directories - optimize by processing deepest first
- if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
- bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
-
- // Collect and sort directories by depth (deepest first) to avoid redundant checks
- var allDirs []string
- for dirPath := range directoriesWithDeletion {
- allDirs = append(allDirs, dirPath)
- }
- // Sort by depth (deeper directories first)
- slices.SortFunc(allDirs, func(a, b string) int {
- return strings.Count(b, "/") - strings.Count(a, "/")
- })
-
- // Track already-checked directories to avoid redundant work
- checked := make(map[string]bool)
- for _, dirPath := range allDirs {
- if !checked[dirPath] {
- // Recursively delete empty parent directories, stop at bucket path
- // Mark this directory and all its parents as checked during recursion
- filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked)
- }
- }
- }
+ // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
+ // which listens to metadata events and uses consistent hashing for coordination
return nil
})