diff options
| author | chrislu <chris.lu@gmail.com> | 2025-11-05 14:17:38 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-11-05 14:17:38 -0800 |
| commit | ec8ca216a5680400d5fa30295e6672c6a291eb62 (patch) | |
| tree | 6b677347ecebb442742a3c3ea0191cbccc1c46f9 | |
| parent | 55ee4513e77d9b0aefb67bb86bb75519ee29aad3 (diff) | |
| download | seaweedfs-ec8ca216a5680400d5fa30295e6672c6a291eb62.tar.xz seaweedfs-ec8ca216a5680400d5fa30295e6672c6a291eb62.zip | |
add context, sort directories by depth (deepest first) to avoid redundant checks
| -rw-r--r-- | weed/s3api/s3api_object_handlers_delete.go | 60 |
1 files changed, 46 insertions, 14 deletions
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 5369aaa5b..04fea80be 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "slices" "strings" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -126,6 +127,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Use operation context that won't be cancelled if request terminates + // This ensures deletion completes atomically to avoid inconsistent state + opCtx := context.WithoutCancel(r.Context()) if err := doDeleteEntry(client, dir, name, true, false); err != nil { return err @@ -135,7 +139,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 { bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) // Recursively delete empty parent directories, stop at bucket path - deleteEmptyParentDirectories(client, util.FullPath(dir), util.FullPath(bucketPath)) + deleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil) } return nil @@ -241,6 +245,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h versioningConfigured := (versioningState != "") s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Use operation context that won't be cancelled if request terminates + // This ensures batch deletion completes atomically to avoid inconsistent state + opCtx := context.WithoutCancel(r.Context()) // delete file entries for _, object := range deleteObjects.Objects { @@ -373,12 +380,28 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } - // Cleanup empty directories + // Cleanup empty directories - optimize by processing deepest first if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 { bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) + + // Collect and sort directories by depth (deepest first) to avoid redundant checks + var allDirs []string for dirPath := range directoriesWithDeletion { - // Recursively delete empty parent directories, stop at bucket path - deleteEmptyParentDirectories(client, util.FullPath(dirPath), util.FullPath(bucketPath)) + allDirs = append(allDirs, dirPath) + } + // Sort by depth (deeper directories first) + slices.SortFunc(allDirs, func(a, b string) int { + return strings.Count(b, "/") - strings.Count(a, "/") + }) + + // Track already-checked directories to avoid redundant work + checked := make(map[string]bool) + for _, dirPath := range allDirs { + if !checked[dirPath] { + // Recursively delete empty parent directories, stop at bucket path + // Mark this directory and all its parents as checked during recursion + deleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked) + } } } @@ -401,45 +424,54 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h // It stops at root "/" or at stopAtPath. // This implements the same logic as filer.DeleteEmptyParentDirectories but uses gRPC client. // For safety, dirPath must be under stopAtPath (when stopAtPath is provided). -func deleteEmptyParentDirectories(client filer_pb.SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath) { +// The checked map tracks already-processed directories to avoid redundant work in batch operations. +func deleteEmptyParentDirectories(ctx context.Context, client filer_pb.SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath, checked map[string]bool) { if dirPath == "/" || dirPath == stopAtPath { return } + // Skip if already checked (for batch delete optimization) + dirPathStr := string(dirPath) + if checked != nil { + if checked[dirPathStr] { + return + } + checked[dirPathStr] = true + } + // Safety check: if stopAtPath is provided, dirPath must be under it - if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") { - glog.V(1).Infof("deleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath) + if stopAtPath != "" && !strings.HasPrefix(dirPathStr+"/", string(stopAtPath)+"/") { + glog.V(1).InfofCtx(ctx, "deleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath) return } // Check if directory is empty by listing with limit 1 - ctx := context.Background() isEmpty := true - err := filer_pb.SeaweedList(ctx, client, string(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error { + err := filer_pb.SeaweedList(ctx, client, dirPathStr, "", func(entry *filer_pb.Entry, isLast bool) error { isEmpty = false return io.EOF // Use sentinel error to explicitly stop iteration }, "", false, 1) if err != nil && err != io.EOF { - glog.V(3).Infof("deleteEmptyParentDirectories: error checking %s: %v", dirPath, err) + glog.V(3).InfofCtx(ctx, "deleteEmptyParentDirectories: error checking %s: %v", dirPath, err) return } if !isEmpty { // Directory is not empty, stop checking upward - glog.V(3).Infof("deleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath) + glog.V(3).InfofCtx(ctx, "deleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath) return } // Directory is empty, try to delete it - glog.V(2).Infof("deleteEmptyParentDirectories: deleting empty directory %s", dirPath) + glog.V(2).InfofCtx(ctx, "deleteEmptyParentDirectories: deleting empty directory %s", dirPath) parentDir, dirName := dirPath.DirAndName() if err := doDeleteEntry(client, parentDir, dirName, false, false); err == nil { // Successfully deleted, continue checking upwards - deleteEmptyParentDirectories(client, util.FullPath(parentDir), stopAtPath) + deleteEmptyParentDirectories(ctx, client, util.FullPath(parentDir), stopAtPath, checked) } else { // Failed to delete, stop cleanup - glog.V(3).Infof("deleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err) + glog.V(3).InfofCtx(ctx, "deleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err) } } |
