aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_delete.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_delete.go')
-rw-r--r--weed/s3api/s3api_object_handlers_delete.go121
1 files changed, 91 insertions, 30 deletions
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go
index b2d9c51c9..8cb5c04fe 100644
--- a/weed/s3api/s3api_object_handlers_delete.go
+++ b/weed/s3api/s3api_object_handlers_delete.go
@@ -32,8 +32,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
- // Check if versioning is enabled for the bucket
- versioningEnabled, err := s3a.isVersioningEnabled(bucket)
+ // Check if versioning is configured for the bucket (Enabled or Suspended)
+ versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -49,7 +49,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
- if versioningEnabled {
+ if versioningConfigured {
// Handle versioned delete
if versionId != "" {
// Check object lock permissions before deleting specific version
@@ -137,8 +137,10 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// ObjectIdentifier represents an object to be deleted with its key name and optional version ID.
type ObjectIdentifier struct {
- Key string `xml:"Key"`
- VersionId string `xml:"VersionId,omitempty"`
+ Key string `xml:"Key"`
+ VersionId string `xml:"VersionId,omitempty"`
+ DeleteMarker bool `xml:"DeleteMarker,omitempty"`
+ DeleteMarkerVersionId string `xml:"DeleteMarkerVersionId,omitempty"`
}
// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
@@ -201,8 +203,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
- // Check if versioning is enabled for the bucket (needed for object lock checks)
- versioningEnabled, err := s3a.isVersioningEnabled(bucket)
+ // Check if versioning is configured for the bucket (needed for object lock checks)
+ versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -222,7 +224,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
// Check object lock permissions before deletion (only for versioned buckets)
- if versioningEnabled {
+ if versioningConfigured {
// Validate governance bypass for this specific object
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object.Key)
if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); err != nil {
@@ -236,31 +238,90 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
continue
}
}
- lastSeparator := strings.LastIndex(object.Key, "/")
- parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
- if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
- entryName = object.Key[lastSeparator+1:]
- parentDirectoryPath = "/" + object.Key[:lastSeparator]
- }
- parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
-
- err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
- if err == nil {
- directoriesWithDeletion[parentDirectoryPath]++
- deletedObjects = append(deletedObjects, object)
- } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
- deletedObjects = append(deletedObjects, object)
+
+ var deleteVersionId string
+ var isDeleteMarker bool
+
+ if versioningConfigured {
+ // Handle versioned delete
+ if object.VersionId != "" {
+ // Delete specific version
+ err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId)
+ if err != nil {
+ deleteErrors = append(deleteErrors, DeleteError{
+ Code: "",
+ Message: err.Error(),
+ Key: object.Key,
+ VersionId: object.VersionId,
+ })
+ continue
+ }
+ deleteVersionId = object.VersionId
+ } else {
+ // Create delete marker (logical delete)
+ deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key)
+ if err != nil {
+ deleteErrors = append(deleteErrors, DeleteError{
+ Code: "",
+ Message: err.Error(),
+ Key: object.Key,
+ VersionId: object.VersionId,
+ })
+ continue
+ }
+ deleteVersionId = deleteMarkerVersionId
+ isDeleteMarker = true
+ }
+
+ // Add to successful deletions with version info
+ deletedObject := ObjectIdentifier{
+ Key: object.Key,
+ VersionId: deleteVersionId,
+ DeleteMarker: isDeleteMarker,
+ }
+
+ // For delete markers, also set DeleteMarkerVersionId field
+ if isDeleteMarker {
+ deletedObject.DeleteMarkerVersionId = deleteVersionId
+ // Don't set VersionId for delete markers, use DeleteMarkerVersionId instead
+ deletedObject.VersionId = ""
+ }
+ if !deleteObjects.Quiet {
+ deletedObjects = append(deletedObjects, deletedObject)
+ }
+ if isDeleteMarker {
+ // For delete markers, we don't need to track directories for cleanup
+ continue
+ }
} else {
- delete(directoriesWithDeletion, parentDirectoryPath)
- deleteErrors = append(deleteErrors, DeleteError{
- Code: "",
- Message: err.Error(),
- Key: object.Key,
- VersionId: object.VersionId,
- })
+ // Handle non-versioned delete (original logic)
+ lastSeparator := strings.LastIndex(object.Key, "/")
+ parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
+ if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
+ entryName = object.Key[lastSeparator+1:]
+ parentDirectoryPath = "/" + object.Key[:lastSeparator]
+ }
+ parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
+
+ err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
+ if err == nil {
+ directoriesWithDeletion[parentDirectoryPath]++
+ deletedObjects = append(deletedObjects, object)
+ } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
+ deletedObjects = append(deletedObjects, object)
+ } else {
+ delete(directoriesWithDeletion, parentDirectoryPath)
+ deleteErrors = append(deleteErrors, DeleteError{
+ Code: "",
+ Message: err.Error(),
+ Key: object.Key,
+ VersionId: object.VersionId,
+ })
+ }
}
+
if auditLog != nil {
- auditLog.Key = entryName
+ auditLog.Key = object.Key
s3err.PostAccessLog(*auditLog)
}
}