aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_acl.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_acl.go')
-rw-r--r--weed/s3api/s3api_object_handlers_acl.go148
1 files changed, 134 insertions, 14 deletions
diff --git a/weed/s3api/s3api_object_handlers_acl.go b/weed/s3api/s3api_object_handlers_acl.go
index 7185f9896..1386b6cba 100644
--- a/weed/s3api/s3api_object_handlers_acl.go
+++ b/weed/s3api/s3api_object_handlers_acl.go
@@ -24,18 +24,63 @@ func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Reque
return
}
- // Check if object exists and get its metadata
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- entry, err := s3a.getEntry(bucketDir, object)
+ // Check for specific version ID in query parameters
+ versionId := r.URL.Query().Get("versionId")
+
+ // Check if versioning is configured for the bucket (Enabled or Suspended)
+ versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
- if errors.Is(err, filer_pb.ErrNotFound) {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ if err == filer_pb.ErrNotFound {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
- glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
+ glog.Errorf("GetObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
+
+ var entry *filer_pb.Entry
+
+ if versioningConfigured {
+ // Handle versioned object ACL retrieval - use same logic as GetObjectHandler
+ if versionId != "" {
+ // Request for specific version
+ glog.V(2).Infof("GetObjectAclHandler: requesting ACL for specific version %s of %s%s", versionId, bucket, object)
+ entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
+ } else {
+ // Request for latest version
+ glog.V(2).Infof("GetObjectAclHandler: requesting ACL for latest version of %s%s", bucket, object)
+ entry, err = s3a.getLatestObjectVersion(bucket, object)
+ }
+
+ if err != nil {
+ glog.Errorf("GetObjectAclHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+
+ // Check if this is a delete marker
+ if entry.Extended != nil {
+ if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ }
+ } else {
+ // Handle regular (non-versioned) object ACL retrieval
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ entry, err = s3a.getEntry(bucketDir, object)
+ if err != nil {
+ if errors.Is(err, filer_pb.ErrNotFound) {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ }
+
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
@@ -123,18 +168,63 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
return
}
- // Check if object exists and get its metadata
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- entry, err := s3a.getEntry(bucketDir, object)
+ // Check for specific version ID in query parameters
+ versionId := r.URL.Query().Get("versionId")
+
+ // Check if versioning is configured for the bucket (Enabled or Suspended)
+ versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
- if errors.Is(err, filer_pb.ErrNotFound) {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ if err == filer_pb.ErrNotFound {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
- glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
+ glog.Errorf("PutObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
+
+ var entry *filer_pb.Entry
+
+ if versioningConfigured {
+ // Handle versioned object ACL modification - use same logic as GetObjectHandler
+ if versionId != "" {
+ // Request for specific version
+ glog.V(2).Infof("PutObjectAclHandler: modifying ACL for specific version %s of %s%s", versionId, bucket, object)
+ entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
+ } else {
+ // Request for latest version
+ glog.V(2).Infof("PutObjectAclHandler: modifying ACL for latest version of %s%s", bucket, object)
+ entry, err = s3a.getLatestObjectVersion(bucket, object)
+ }
+
+ if err != nil {
+ glog.Errorf("PutObjectAclHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+
+ // Check if this is a delete marker
+ if entry.Extended != nil {
+ if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ }
+ } else {
+ // Handle regular (non-versioned) object ACL modification
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ entry, err = s3a.getEntry(bucketDir, object)
+ if err != nil {
+ if errors.Is(err, filer_pb.ErrNotFound) {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ }
+
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
@@ -208,14 +298,44 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
// Store ACL in object metadata
if errCode := AssembleEntryWithAcp(entry, objectOwner, grants); errCode != s3err.ErrNone {
glog.Errorf("PutObjectAclHandler: failed to assemble entry with ACP: %v", errCode)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
+ // Calculate the correct directory for ACL update
+ var updateDirectory string
+
+ if versioningConfigured {
+ if versionId != "" && versionId != "null" {
+ // Versioned object - update the specific version file in .versions directory
+ updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions"
+ } else {
+ // Latest version in versioned bucket - could be null version or versioned object
+ // Extract version ID from the entry to determine where it's stored
+ var actualVersionId string
+ if entry.Extended != nil {
+ if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
+ actualVersionId = string(versionIdBytes)
+ }
+ }
+
+ if actualVersionId == "null" || actualVersionId == "" {
+ // Null version (pre-versioning object) - stored as regular file
+ updateDirectory = s3a.option.BucketsPath + "/" + bucket
+ } else {
+ // Versioned object - stored in .versions directory
+ updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions"
+ }
+ }
+ } else {
+ // Non-versioned object - stored as regular file
+ updateDirectory = s3a.option.BucketsPath + "/" + bucket
+ }
+
// Update the object with new ACL metadata
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
- Directory: bucketDir,
+ Directory: updateDirectory,
Entry: entry,
}