aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_delete.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_delete.go')
-rw-r--r--weed/s3api/s3api_object_handlers_delete.go91
1 files changed, 67 insertions, 24 deletions
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go
index 802e82b5f..d7457fabe 100644
--- a/weed/s3api/s3api_object_handlers_delete.go
+++ b/weed/s3api/s3api_object_handlers_delete.go
@@ -29,44 +29,87 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
- target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
- dir, name := target.DirAndName()
+ // Check for specific version ID in query parameters
+ versionId := r.URL.Query().Get("versionId")
- var auditLog *s3err.AccessLog
+ // Check if versioning is enabled for the bucket
+ versioningEnabled, err := s3a.isVersioningEnabled(bucket)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
+ return
+ }
+ glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ var auditLog *s3err.AccessLog
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
- err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ if versioningEnabled {
+ // Handle versioned delete
+ if versionId != "" {
+ // Delete specific version
+ err := s3a.deleteSpecificObjectVersion(bucket, object, versionId)
+ if err != nil {
+ glog.Errorf("Failed to delete specific version %s: %v", versionId, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
- if err := doDeleteEntry(client, dir, name, true, false); err != nil {
- return err
- }
+ // Set version ID in response header
+ w.Header().Set("x-amz-version-id", versionId)
+ } else {
+ // Create delete marker (logical delete)
+ deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
+ if err != nil {
+ glog.Errorf("Failed to create delete marker: %v", err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
- if auditLog != nil {
- auditLog.Key = name
- s3err.PostAccessLog(*auditLog)
+ // Set delete marker version ID in response header
+ w.Header().Set("x-amz-version-id", deleteMarkerVersionId)
+ w.Header().Set("x-amz-delete-marker", "true")
}
+ } else {
+ // Handle regular delete (non-versioned)
+ target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
+ dir, name := target.DirAndName()
- if s3a.option.AllowEmptyFolder {
- return nil
- }
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+
+ if err := doDeleteEntry(client, dir, name, true, false); err != nil {
+ return err
+ }
- directoriesWithDeletion := make(map[string]int)
- if strings.LastIndex(object, "/") > 0 {
- directoriesWithDeletion[dir]++
- // purge empty folders, only checking folders with deletions
- for len(directoriesWithDeletion) > 0 {
- directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
+ if s3a.option.AllowEmptyFolder {
+ return nil
}
+
+ directoriesWithDeletion := make(map[string]int)
+ if strings.LastIndex(object, "/") > 0 {
+ directoriesWithDeletion[dir]++
+ // purge empty folders, only checking folders with deletions
+ for len(directoriesWithDeletion) > 0 {
+ directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
}
+ }
- return nil
- })
- if err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
+ if auditLog != nil {
+ auditLog.Key = strings.TrimPrefix(object, "/")
+ s3err.PostAccessLog(*auditLog)
}
stats_collect.RecordBucketActiveTime(bucket)