aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/filer.go')
-rw-r--r--weed/filer/filer.go77
1 files changed, 52 insertions, 25 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 74536098d..75c50871e 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -419,19 +419,14 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
// Only check if we didn't find any valid entries and we're not at root
if !hasValidEntries && p != "/" && startFileName == "" {
// Do a quick check to see if directory is truly empty now
- isEmpty := true
- _, checkErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, "", true, 1, prefix, func(entry *Entry) bool {
- isEmpty = false
- return false // Stop after first entry
- })
- if checkErr == nil && isEmpty {
+ if isEmpty, checkErr := f.IsDirectoryEmpty(ctx, p); checkErr == nil && isEmpty {
glog.V(2).InfofCtx(ctx, "doListDirectoryEntries: deleting empty directory %s after expiring all entries", p)
parentDir, _ := p.DirAndName()
if dirEntry, findErr := f.FindEntry(ctx, p); findErr == nil {
// Delete the now-empty directory
if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
// Recursively try to delete parent directories if they become empty
- f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(parentDir))
+ f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), "")
}
}
}
@@ -441,45 +436,77 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
return
}
-// maybeDeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty
-func (f *Filer) maybeDeleteEmptyParentDirectories(ctx context.Context, parentDir util.FullPath) {
- if parentDir == "/" {
+// DeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty.
+// It stops at root "/" or at stopAtPath (if provided).
+// This is useful for cleaning up directories after deleting files or expired entries.
+//
+// IMPORTANT: For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
+// This prevents accidental deletion of directories outside the intended scope (e.g., outside bucket paths).
+//
+// Example usage:
+//
+// // After deleting /bucket/dir/subdir/file.txt, clean up empty parent directories
+// // but stop at the bucket path
+// parentPath := util.FullPath("/bucket/dir/subdir")
+// filer.DeleteEmptyParentDirectories(ctx, parentPath, util.FullPath("/bucket"))
+//
+// Example with gRPC client:
+//
+// if err := pb_filer_client.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+// return filer_pb.Traverse(ctx, filer, parentPath, "", func(entry *filer_pb.Entry) error {
+// // Process entries...
+// })
+// }); err == nil {
+// filer.DeleteEmptyParentDirectories(ctx, parentPath, stopPath)
+// }
+func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.FullPath, stopAtPath util.FullPath) {
+ if dirPath == "/" || dirPath == stopAtPath {
return
}
- // Check if parent directory is empty
- isEmpty := true
- _, err := f.Store.ListDirectoryPrefixedEntries(ctx, parentDir, "", true, 1, "", func(entry *Entry) bool {
- isEmpty = false
- return false // Stop after first entry
- })
+ // Safety check: if stopAtPath is provided, dirPath must be under it
+ if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") {
+ glog.V(1).InfofCtx(ctx, "DeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
+ return
+ }
+ // Check if directory is empty
+ isEmpty, err := f.IsDirectoryEmpty(ctx, dirPath)
if err != nil {
- // Error checking directory, stop cleanup
- glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: error checking %s: %v", parentDir, err)
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
return
}
if !isEmpty {
// Directory is not empty, stop checking upward
- glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", parentDir)
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
return
}
// Directory is empty, try to delete it
- glog.V(2).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: deleting empty directory %s", parentDir)
- grandParentDir, _ := parentDir.DirAndName()
- if parentEntry, findErr := f.FindEntry(ctx, parentDir); findErr == nil {
- if delErr := f.doDeleteEntryMetaAndData(ctx, parentEntry, false, false, nil); delErr == nil {
+ glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: deleting empty directory %s", dirPath)
+ parentDir, _ := dirPath.DirAndName()
+ if dirEntry, findErr := f.FindEntry(ctx, dirPath); findErr == nil {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
// Successfully deleted, continue checking upwards
- f.maybeDeleteEmptyParentDirectories(ctx, util.FullPath(grandParentDir))
+ f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), stopAtPath)
} else {
// Failed to delete, stop cleanup
- glog.V(3).InfofCtx(ctx, "maybeDeleteEmptyParentDirectories: failed to delete %s: %v", parentDir, delErr)
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, delErr)
}
}
}
+// IsDirectoryEmpty checks if a directory contains any entries
+func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) {
+ isEmpty := true
+ _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool {
+ isEmpty = false
+ return false // Stop after first entry
+ })
+ return isEmpty, err
+}
+
func (f *Filer) Shutdown() {
close(f.deletionQuit)
f.LocalMetaLogBuffer.ShutdownLogBuffer()