aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_delete.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_delete.go')
-rw-r--r--weed/s3api/s3api_object_handlers_delete.go109
1 files changed, 63 insertions, 46 deletions
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go
index 3a2544710..fcaac5da0 100644
--- a/weed/s3api/s3api_object_handlers_delete.go
+++ b/weed/s3api/s3api_object_handlers_delete.go
@@ -1,21 +1,18 @@
package s3api
import (
+ "context"
"encoding/xml"
"fmt"
"io"
"net/http"
- "slices"
"strings"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
-
"github.com/seaweedfs/seaweedfs/weed/filer"
-
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
-
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@@ -134,17 +131,11 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
return err
}
- if s3a.option.AllowEmptyFolder {
- return nil
- }
-
- directoriesWithDeletion := make(map[string]int)
- if strings.LastIndex(object, "/") > 0 {
- directoriesWithDeletion[dir]++
- // purge empty folders, only checking folders with deletions
- for len(directoriesWithDeletion) > 0 {
- directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
- }
+ // Cleanup empty directories
+ if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
+ bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
+ // Recursively delete empty parent directories, stop at bucket path
+ deleteEmptyParentDirectories(client, util.FullPath(dir), util.FullPath(bucketPath))
}
return nil
@@ -227,7 +218,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
- directoriesWithDeletion := make(map[string]int)
+ directoriesWithDeletion := make(map[string]bool)
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
@@ -359,12 +350,14 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
- directoriesWithDeletion[parentDirectoryPath]++
+ // Track directory for empty directory cleanup
+ if !s3a.option.AllowEmptyFolder {
+ directoriesWithDeletion[parentDirectoryPath] = true
+ }
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
} else {
- delete(directoriesWithDeletion, parentDirectoryPath)
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
@@ -380,13 +373,13 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
- if s3a.option.AllowEmptyFolder {
- return nil
- }
-
- // purge empty folders, only checking folders with deletions
- for len(directoriesWithDeletion) > 0 {
- directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
+ // Cleanup empty directories
+ if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
+ bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
+ for dirPath := range directoriesWithDeletion {
+ // Recursively delete empty parent directories, stop at bucket path
+ deleteEmptyParentDirectories(client, util.FullPath(dirPath), util.FullPath(bucketPath))
+ }
}
return nil
@@ -404,25 +397,49 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
-func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
- var allDirs []string
- for dir := range directoriesWithDeletion {
- allDirs = append(allDirs, dir)
+// deleteEmptyParentDirectories recursively deletes empty parent directories.
+// It stops at root "/" or at stopAtPath.
+// This implements the same logic as filer.DeleteEmptyParentDirectories but uses gRPC client.
+// For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
+func deleteEmptyParentDirectories(client filer_pb.SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath) {
+ if dirPath == "/" || dirPath == stopAtPath {
+ return
}
- slices.SortFunc(allDirs, func(a, b string) int {
- return len(b) - len(a)
- })
- newDirectoriesWithDeletion = make(map[string]int)
- for _, dir := range allDirs {
- parentDir, dirName := util.FullPath(dir).DirAndName()
- if parentDir == s3a.option.BucketsPath {
- continue
- }
- if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
- glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
- } else {
- newDirectoriesWithDeletion[parentDir]++
- }
+
+ // Safety check: if stopAtPath is provided, dirPath must be under it
+ if stopAtPath != "" && !strings.HasPrefix(string(dirPath)+"/", string(stopAtPath)+"/") {
+ glog.V(1).Infof("deleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
+ return
+ }
+
+ // Check if directory is empty by listing with limit 1
+ ctx := context.Background()
+ isEmpty := true
+ err := filer_pb.SeaweedList(ctx, client, string(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error {
+ isEmpty = false
+ return nil
+ }, "", false, 1)
+
+ if err != nil {
+ glog.V(3).Infof("deleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
+ return
+ }
+
+ if !isEmpty {
+ // Directory is not empty, stop checking upward
+ glog.V(3).Infof("deleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
+ return
+ }
+
+ // Directory is empty, try to delete it
+ glog.V(2).Infof("deleteEmptyParentDirectories: deleting empty directory %s", dirPath)
+ parentDir, dirName := dirPath.DirAndName()
+
+ if err := doDeleteEntry(client, parentDir, dirName, false, false); err == nil {
+ // Successfully deleted, continue checking upwards
+ deleteEmptyParentDirectories(client, util.FullPath(parentDir), stopAtPath)
+ } else {
+ // Failed to delete, stop cleanup
+ glog.V(3).Infof("deleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err)
}
- return
}