diff options
Diffstat (limited to 'weed/filer/filer.go')
| -rw-r--r-- | weed/filer/filer.go | 133 |
1 files changed, 129 insertions, 4 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index b86ac3c5b..d3d2de948 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -351,37 +351,162 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e } entry, err = f.Store.FindEntry(ctx, p) if entry != nil && entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + if entry.IsExpireS3Enabled() { + if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() { + if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil { + glog.ErrorfCtx(ctx, "FindEntry doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr) + } + return nil, filer_pb.ErrNotFound + } + } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { f.Store.DeleteOneEntry(ctx, entry) return nil, filer_pb.ErrNotFound } } - return + return entry, err } func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) { + // Collect expired entries during iteration to avoid deadlock with DB connection pool + var expiredEntries []*Entry + var s3ExpiredEntries []*Entry + var hasValidEntries bool + lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { select { case <-ctx.Done(): return false default: if entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - f.Store.DeleteOneEntry(ctx, entry) + if entry.IsExpireS3Enabled() { + if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() { + // Collect for deletion after iteration completes to avoid DB deadlock + s3ExpiredEntries = append(s3ExpiredEntries, entry) + expiredCount++ + return true + } + } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + // Collect for deletion after iteration completes to avoid DB deadlock + expiredEntries = append(expiredEntries, entry) expiredCount++ return true } } + // Track that we found at least one valid (non-expired) entry + hasValidEntries = true return eachEntryFunc(entry) } }) if err != nil { return expiredCount, lastFileName, err } + + // Delete expired entries after iteration completes to avoid DB connection deadlock + if len(s3ExpiredEntries) > 0 || len(expiredEntries) > 0 { + for _, entry := range s3ExpiredEntries { + if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil { + glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr) + } + } + for _, entry := range expiredEntries { + if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil { + glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr) + } + } + + // After expiring entries, the directory might be empty. + // Attempt to clean it up and any empty parent directories. + if !hasValidEntries && p != "/" && startFileName == "" { + stopAtPath := util.FullPath(f.DirBucketsPath) + f.DeleteEmptyParentDirectories(ctx, p, stopAtPath) + } + } + return } +// DeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty. +// It stops at root "/" or at stopAtPath (if provided). +// This is useful for cleaning up directories after deleting files or expired entries. +// +// IMPORTANT: For safety, dirPath must be under stopAtPath (when stopAtPath is provided). +// This prevents accidental deletion of directories outside the intended scope (e.g., outside bucket paths). +// +// Example usage: +// +// // After deleting /bucket/dir/subdir/file.txt, clean up empty parent directories +// // but stop at the bucket path +// parentPath := util.FullPath("/bucket/dir/subdir") +// filer.DeleteEmptyParentDirectories(ctx, parentPath, util.FullPath("/bucket")) +// +// Example with gRPC client: +// +// if err := pb_filer_client.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { +// return filer_pb.Traverse(ctx, filer, parentPath, "", func(entry *filer_pb.Entry) error { +// // Process entries... +// }) +// }); err == nil { +// filer.DeleteEmptyParentDirectories(ctx, parentPath, stopPath) +// } +func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.FullPath, stopAtPath util.FullPath) { + if dirPath == "/" || dirPath == stopAtPath { + return + } + + // Safety check: if stopAtPath is provided, dirPath must be under it (root "/" allows everything) + stopStr := string(stopAtPath) + if stopAtPath != "" && stopStr != "/" && !strings.HasPrefix(string(dirPath)+"/", stopStr+"/") { + glog.V(1).InfofCtx(ctx, "DeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath) + return + } + + // Additional safety: prevent deletion of bucket-level directories + // This protects /buckets/mybucket from being deleted even if empty + baseDepth := strings.Count(f.DirBucketsPath, "/") + dirDepth := strings.Count(string(dirPath), "/") + if dirDepth <= baseDepth+1 { + glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: skipping deletion of bucket-level directory %s", dirPath) + return + } + + // Check if directory is empty + isEmpty, err := f.IsDirectoryEmpty(ctx, dirPath) + if err != nil { + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: error checking %s: %v", dirPath, err) + return + } + + if !isEmpty { + // Directory is not empty, stop checking upward + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath) + return + } + + // Directory is empty, try to delete it + glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: deleting empty directory %s", dirPath) + parentDir, _ := dirPath.DirAndName() + if dirEntry, findErr := f.FindEntry(ctx, dirPath); findErr == nil { + if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil { + // Successfully deleted, continue checking upwards + f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), stopAtPath) + } else { + // Failed to delete, stop cleanup + glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, delErr) + } + } +} + +// IsDirectoryEmpty checks if a directory contains any entries +func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) { + isEmpty := true + _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool { + isEmpty = false + return false // Stop after first entry + }) + return isEmpty, err +} + func (f *Filer) Shutdown() { close(f.deletionQuit) f.LocalMetaLogBuffer.ShutdownLogBuffer() |
