diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_delete.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_delete.go | 91 |
1 files changed, 67 insertions, 24 deletions
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 802e82b5f..d7457fabe 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -29,44 +29,87 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque bucket, object := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object) - target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) - dir, name := target.DirAndName() + // Check for specific version ID in query parameters + versionId := r.URL.Query().Get("versionId") - var auditLog *s3err.AccessLog + // Check if versioning is enabled for the bucket + versioningEnabled, err := s3a.isVersioningEnabled(bucket) + if err != nil { + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + return + } + glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + var auditLog *s3err.AccessLog if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + if versioningEnabled { + // Handle versioned delete + if versionId != "" { + // Delete specific version + err := s3a.deleteSpecificObjectVersion(bucket, object, versionId) + if err != nil { + glog.Errorf("Failed to delete specific version %s: %v", versionId, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } - if err := doDeleteEntry(client, dir, name, true, false); err != nil { - return err - } + // Set version ID in response header + w.Header().Set("x-amz-version-id", versionId) + } else { + // Create delete marker (logical delete) + deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object) + if err != nil { + glog.Errorf("Failed to create delete marker: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } - if auditLog != nil { - auditLog.Key = name - s3err.PostAccessLog(*auditLog) + // Set delete marker version ID in response header + w.Header().Set("x-amz-version-id", deleteMarkerVersionId) + w.Header().Set("x-amz-delete-marker", "true") } + } else { + // Handle regular delete (non-versioned) + target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) + dir, name := target.DirAndName() - if s3a.option.AllowEmptyFolder { - return nil - } + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + + if err := doDeleteEntry(client, dir, name, true, false); err != nil { + return err + } - directoriesWithDeletion := make(map[string]int) - if strings.LastIndex(object, "/") > 0 { - directoriesWithDeletion[dir]++ - // purge empty folders, only checking folders with deletions - for len(directoriesWithDeletion) > 0 { - directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion) + if s3a.option.AllowEmptyFolder { + return nil } + + directoriesWithDeletion := make(map[string]int) + if strings.LastIndex(object, "/") > 0 { + directoriesWithDeletion[dir]++ + // purge empty folders, only checking folders with deletions + for len(directoriesWithDeletion) > 0 { + directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion) + } + } + + return nil + }) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return } + } - return nil - }) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + if auditLog != nil { + auditLog.Key = strings.TrimPrefix(object, "/") + s3err.PostAccessLog(*auditLog) } stats_collect.RecordBucketActiveTime(bucket) |
