aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-11-05 12:42:30 -0800
committerchrislu <chris.lu@gmail.com>2025-11-05 12:42:30 -0800
commit543e70c51180e916b75541de0d0b1c74c05b1c71 (patch)
treef173046ec42b00d44f5223b525c51bd815078014
parent5d5da14e0ed91c613fe5c0ed058f58bb04fba6f0 (diff)
downloadseaweedfs-543e70c51180e916b75541de0d0b1c74c05b1c71.tar.xz
seaweedfs-543e70c51180e916b75541de0d0b1c74c05b1c71.zip
move deletion out of listing transaction; delete entries and empty folders
-rw-r--r--weed/filer/filer.go79
1 files changed, 73 insertions, 6 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index c4ba41491..55dbbd804 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -368,6 +368,11 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
+ // Collect expired entries during iteration to avoid deadlock with DB connection pool
+ var expiredEntries []*Entry
+ var s3ExpiredEntries []*Entry
+ var hasValidEntries bool
+
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
select {
case <-ctx.Done():
@@ -376,29 +381,91 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
if entry.TtlSec > 0 {
if entry.IsExpireS3Enabled() {
if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() {
- if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
- glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
- }
+ // Collect for deletion after iteration completes to avoid DB deadlock
+ s3ExpiredEntries = append(s3ExpiredEntries, entry)
expiredCount++
return true
}
} else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
- glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
- }
+ // Collect for deletion after iteration completes to avoid DB deadlock
+ expiredEntries = append(expiredEntries, entry)
expiredCount++
return true
}
}
+ // Track that we found at least one valid (non-expired) entry
+ hasValidEntries = true
return eachEntryFunc(entry)
}
})
if err != nil {
return expiredCount, lastFileName, err
}
+
+ // Delete expired entries after iteration completes to avoid DB connection deadlock
+ if len(s3ExpiredEntries) > 0 || len(expiredEntries) > 0 {
+ for _, entry := range s3ExpiredEntries {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
+ glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
+ }
+ }
+ for _, entry := range expiredEntries {
+ if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
+ glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
+ }
+ }
+
+ // Check if directory is now empty and delete it if so
+ // Only check if we didn't find any valid entries and we're not at root
+ if !hasValidEntries && p != "/" && startFileName == "" {
+ // Do a quick check to see if directory is truly empty now
+ isEmpty := true
+ _, checkErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, "", true, 1, prefix, func(entry *Entry) bool {
+ isEmpty = false
+ return false // Stop after first entry
+ })
+ if checkErr == nil && isEmpty {
+ glog.V(2).InfofCtx(ctx, "doListDirectoryEntries: deleting empty directory %s after expiring all entries", p)
+ parentDir, _ := p.DirAndName()
+ if dirEntry, findErr := f.FindEntry(ctx, p); findErr == nil {
+ // Delete the now-empty directory
+ if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
+ // Recursively try to delete parent directories if they become empty
+ f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(parentDir))
+ }
+ }
+ }
+ }
+ }
+
return
}
+// maybeDeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty
+func (f *Filer) maybeDeleteEmptyParentDirectories(ctx context.Context, parentDir util.FullPath) {
+ if parentDir == "/" {
+ return
+ }
+
+ // Check if parent directory is empty
+ isEmpty := true
+ _, err := f.Store.ListDirectoryPrefixedEntries(ctx, parentDir, "", true, 1, "", func(entry *Entry) bool {
+ isEmpty = false
+ return false // Stop after first entry
+ })
+
+ if err == nil && isEmpty {
+ glog.V(2).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: deleting empty directory %s", parentDir)
+ grandParentDir, _ := parentDir.DirAndName()
+ if parentEntry, findErr := f.FindEntry(ctx, parentDir); findErr == nil {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, parentEntry, false, false, nil); delErr == nil {
+ // Continue checking upwards
+ f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(grandParentDir))
+ }
+ }
+ }
+}
+
func (f *Filer) Shutdown() {
close(f.deletionQuit)
f.LocalMetaLogBuffer.ShutdownLogBuffer()