diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_delete.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_delete.go | 121 |
1 files changed, 91 insertions, 30 deletions
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index b2d9c51c9..8cb5c04fe 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -32,8 +32,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque // Check for specific version ID in query parameters versionId := r.URL.Query().Get("versionId") - // Check if versioning is enabled for the bucket - versioningEnabled, err := s3a.isVersioningEnabled(bucket) + // Check if versioning is configured for the bucket (Enabled or Suspended) + versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { if err == filer_pb.ErrNotFound { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) @@ -49,7 +49,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } - if versioningEnabled { + if versioningConfigured { // Handle versioned delete if versionId != "" { // Check object lock permissions before deleting specific version @@ -137,8 +137,10 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque // ObjectIdentifier represents an object to be deleted with its key name and optional version ID. type ObjectIdentifier struct { - Key string `xml:"Key"` - VersionId string `xml:"VersionId,omitempty"` + Key string `xml:"Key"` + VersionId string `xml:"VersionId,omitempty"` + DeleteMarker bool `xml:"DeleteMarker,omitempty"` + DeleteMarkerVersionId string `xml:"DeleteMarkerVersionId,omitempty"` } // DeleteObjectsRequest - xml carrying the object key names which needs to be deleted. @@ -201,8 +203,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } - // Check if versioning is enabled for the bucket (needed for object lock checks) - versioningEnabled, err := s3a.isVersioningEnabled(bucket) + // Check if versioning is configured for the bucket (needed for object lock checks) + versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { if err == filer_pb.ErrNotFound { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) @@ -222,7 +224,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } // Check object lock permissions before deletion (only for versioned buckets) - if versioningEnabled { + if versioningConfigured { // Validate governance bypass for this specific object governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object.Key) if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); err != nil { @@ -236,31 +238,90 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h continue } } - lastSeparator := strings.LastIndex(object.Key, "/") - parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false - if lastSeparator > 0 && lastSeparator+1 < len(object.Key) { - entryName = object.Key[lastSeparator+1:] - parentDirectoryPath = "/" + object.Key[:lastSeparator] - } - parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath) - - err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) - if err == nil { - directoriesWithDeletion[parentDirectoryPath]++ - deletedObjects = append(deletedObjects, object) - } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { - deletedObjects = append(deletedObjects, object) + + var deleteVersionId string + var isDeleteMarker bool + + if versioningConfigured { + // Handle versioned delete + if object.VersionId != "" { + // Delete specific version + err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId) + if err != nil { + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err.Error(), + Key: object.Key, + VersionId: object.VersionId, + }) + continue + } + deleteVersionId = object.VersionId + } else { + // Create delete marker (logical delete) + deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key) + if err != nil { + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err.Error(), + Key: object.Key, + VersionId: object.VersionId, + }) + continue + } + deleteVersionId = deleteMarkerVersionId + isDeleteMarker = true + } + + // Add to successful deletions with version info + deletedObject := ObjectIdentifier{ + Key: object.Key, + VersionId: deleteVersionId, + DeleteMarker: isDeleteMarker, + } + + // For delete markers, also set DeleteMarkerVersionId field + if isDeleteMarker { + deletedObject.DeleteMarkerVersionId = deleteVersionId + // Don't set VersionId for delete markers, use DeleteMarkerVersionId instead + deletedObject.VersionId = "" + } + if !deleteObjects.Quiet { + deletedObjects = append(deletedObjects, deletedObject) + } + if isDeleteMarker { + // For delete markers, we don't need to track directories for cleanup + continue + } } else { - delete(directoriesWithDeletion, parentDirectoryPath) - deleteErrors = append(deleteErrors, DeleteError{ - Code: "", - Message: err.Error(), - Key: object.Key, - VersionId: object.VersionId, - }) + // Handle non-versioned delete (original logic) + lastSeparator := strings.LastIndex(object.Key, "/") + parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false + if lastSeparator > 0 && lastSeparator+1 < len(object.Key) { + entryName = object.Key[lastSeparator+1:] + parentDirectoryPath = "/" + object.Key[:lastSeparator] + } + parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath) + + err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) + if err == nil { + directoriesWithDeletion[parentDirectoryPath]++ + deletedObjects = append(deletedObjects, object) + } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { + deletedObjects = append(deletedObjects, object) + } else { + delete(directoriesWithDeletion, parentDirectoryPath) + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err.Error(), + Key: object.Key, + VersionId: object.VersionId, + }) + } } + if auditLog != nil { - auditLog.Key = entryName + auditLog.Key = object.Key s3err.PostAccessLog(*auditLog) } } |
