aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/filer.go')
-rw-r--r--weed/filer/filer.go133
1 files changed, 129 insertions, 4 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index b86ac3c5b..d3d2de948 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -351,37 +351,162 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ if entry.IsExpireS3Enabled() {
+ if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
+ glog.ErrorfCtx(ctx, "FindEntry doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
+ }
+ return nil, filer_pb.ErrNotFound
+ }
+ } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
return nil, filer_pb.ErrNotFound
}
}
- return
+ return entry, err
}
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
+ // Collect expired entries during iteration to avoid deadlock with DB connection pool
+ var expiredEntries []*Entry
+ var s3ExpiredEntries []*Entry
+ var hasValidEntries bool
+
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
select {
case <-ctx.Done():
return false
default:
if entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteOneEntry(ctx, entry)
+ if entry.IsExpireS3Enabled() {
+ if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() {
+ // Collect for deletion after iteration completes to avoid DB deadlock
+ s3ExpiredEntries = append(s3ExpiredEntries, entry)
+ expiredCount++
+ return true
+ }
+ } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ // Collect for deletion after iteration completes to avoid DB deadlock
+ expiredEntries = append(expiredEntries, entry)
expiredCount++
return true
}
}
+ // Track that we found at least one valid (non-expired) entry
+ hasValidEntries = true
return eachEntryFunc(entry)
}
})
if err != nil {
return expiredCount, lastFileName, err
}
+
+ // Delete expired entries after iteration completes to avoid DB connection deadlock
+ if len(s3ExpiredEntries) > 0 || len(expiredEntries) > 0 {
+ for _, entry := range s3ExpiredEntries {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
+ glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
+ }
+ }
+ for _, entry := range expiredEntries {
+ if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
+ glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
+ }
+ }
+
+ // After expiring entries, the directory might be empty.
+ // Attempt to clean it up and any empty parent directories.
+ if !hasValidEntries && p != "/" && startFileName == "" {
+ stopAtPath := util.FullPath(f.DirBucketsPath)
+ f.DeleteEmptyParentDirectories(ctx, p, stopAtPath)
+ }
+ }
+
return
}
+// DeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty.
+// It stops at root "/" or at stopAtPath (if provided).
+// This is useful for cleaning up directories after deleting files or expired entries.
+//
+// IMPORTANT: For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
+// This prevents accidental deletion of directories outside the intended scope (e.g., outside bucket paths).
+//
+// Example usage:
+//
+// // After deleting /bucket/dir/subdir/file.txt, clean up empty parent directories
+// // but stop at the bucket path
+// parentPath := util.FullPath("/bucket/dir/subdir")
+// filer.DeleteEmptyParentDirectories(ctx, parentPath, util.FullPath("/bucket"))
+//
+// Example with gRPC client:
+//
+// if err := pb_filer_client.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+// return filer_pb.Traverse(ctx, filer, parentPath, "", func(entry *filer_pb.Entry) error {
+// // Process entries...
+// })
+// }); err == nil {
+// filer.DeleteEmptyParentDirectories(ctx, parentPath, stopPath)
+// }
+func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.FullPath, stopAtPath util.FullPath) {
+ if dirPath == "/" || dirPath == stopAtPath {
+ return
+ }
+
+ // Safety check: if stopAtPath is provided, dirPath must be under it (root "/" allows everything)
+ stopStr := string(stopAtPath)
+ if stopAtPath != "" && stopStr != "/" && !strings.HasPrefix(string(dirPath)+"/", stopStr+"/") {
+ glog.V(1).InfofCtx(ctx, "DeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
+ return
+ }
+
+ // Additional safety: prevent deletion of bucket-level directories
+ // This protects /buckets/mybucket from being deleted even if empty
+ baseDepth := strings.Count(f.DirBucketsPath, "/")
+ dirDepth := strings.Count(string(dirPath), "/")
+ if dirDepth <= baseDepth+1 {
+ glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: skipping deletion of bucket-level directory %s", dirPath)
+ return
+ }
+
+ // Check if directory is empty
+ isEmpty, err := f.IsDirectoryEmpty(ctx, dirPath)
+ if err != nil {
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
+ return
+ }
+
+ if !isEmpty {
+ // Directory is not empty, stop checking upward
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
+ return
+ }
+
+ // Directory is empty, try to delete it
+ glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: deleting empty directory %s", dirPath)
+ parentDir, _ := dirPath.DirAndName()
+ if dirEntry, findErr := f.FindEntry(ctx, dirPath); findErr == nil {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
+ // Successfully deleted, continue checking upwards
+ f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), stopAtPath)
+ } else {
+ // Failed to delete, stop cleanup
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, delErr)
+ }
+ }
+}
+
+// IsDirectoryEmpty checks if a directory contains any entries
+func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) {
+ isEmpty := true
+ _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool {
+ isEmpty = false
+ return false // Stop after first entry
+ })
+ return isEmpty, err
+}
+
func (f *Filer) Shutdown() {
close(f.deletionQuit)
f.LocalMetaLogBuffer.ShutdownLogBuffer()