aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/s3/retention/s3_object_lock_headers_test.go2
-rw-r--r--test/s3/retention/s3_retention_test.go13
-rw-r--r--weed/s3api/s3api_bucket_config.go13
-rw-r--r--weed/s3api/s3api_bucket_handlers.go78
-rw-r--r--weed/s3api/s3api_object_handlers_delete.go1
-rw-r--r--weed/s3api/s3api_object_handlers_put.go30
-rw-r--r--weed/s3api/s3api_object_retention.go18
7 files changed, 113 insertions, 42 deletions
diff --git a/test/s3/retention/s3_object_lock_headers_test.go b/test/s3/retention/s3_object_lock_headers_test.go
index bf7283617..fad9e6fbb 100644
--- a/test/s3/retention/s3_object_lock_headers_test.go
+++ b/test/s3/retention/s3_object_lock_headers_test.go
@@ -236,7 +236,7 @@ func TestObjectLockHeadersNonVersionedBucket(t *testing.T) {
bucketName := getNewBucketName()
// Create regular bucket without object lock/versioning
- createBucket(t, client, bucketName)
+ createBucketWithoutObjectLock(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
key := "test-non-versioned"
diff --git a/test/s3/retention/s3_retention_test.go b/test/s3/retention/s3_retention_test.go
index 8477a50bf..4abdf6d87 100644
--- a/test/s3/retention/s3_retention_test.go
+++ b/test/s3/retention/s3_retention_test.go
@@ -69,9 +69,20 @@ func getNewBucketName() string {
return fmt.Sprintf("%s%d", defaultConfig.BucketPrefix, timestamp)
}
-// createBucket creates a new bucket for testing
+// createBucket creates a new bucket for testing with Object Lock enabled
+// Object Lock is required for retention and legal hold functionality per AWS S3 specification
func createBucket(t *testing.T, client *s3.Client, bucketName string) {
_, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
+ Bucket: aws.String(bucketName),
+ ObjectLockEnabledForBucket: aws.Bool(true),
+ })
+ require.NoError(t, err)
+}
+
+// createBucketWithoutObjectLock creates a new bucket without Object Lock enabled
+// Use this only for tests that specifically need to verify non-Object-Lock bucket behavior
+func createBucketWithoutObjectLock(t *testing.T, client *s3.Client, bucketName string) {
+ _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go
index a10374339..6076f0108 100644
--- a/weed/s3api/s3api_bucket_config.go
+++ b/weed/s3api/s3api_bucket_config.go
@@ -514,6 +514,19 @@ func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
return config.Versioning != "" || config.ObjectLockConfig != nil, nil
}
+// isObjectLockEnabled checks if Object Lock is enabled for a bucket (with caching)
+func (s3a *S3ApiServer) isObjectLockEnabled(bucket string) (bool, error) {
+ config, errCode := s3a.getBucketConfig(bucket)
+ if errCode != s3err.ErrNone {
+ if errCode == s3err.ErrNoSuchBucket {
+ return false, filer_pb.ErrNotFound
+ }
+ return false, fmt.Errorf("failed to get bucket config: %v", errCode)
+ }
+
+ return config.ObjectLockConfig != nil, nil
+}
+
// getVersioningState returns the detailed versioning state for a bucket
func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
config, errCode := s3a.getBucketConfig(bucket)
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index f0704fe23..a810dfd37 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -244,46 +244,64 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
return
}
- // create the folder for bucket, but lazily create actual collection
- if err := s3a.mkdir(s3a.option.BucketsPath, bucket, setBucketOwner(r)); err != nil {
- glog.Errorf("PutBucketHandler mkdir: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
+ // Check for x-amz-bucket-object-lock-enabled header BEFORE creating bucket
+ // This allows us to create the bucket with Object Lock configuration atomically
+ objectLockEnabled := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true")
- // Remove bucket from negative cache after successful creation
- if s3a.bucketConfigCache != nil {
- s3a.bucketConfigCache.RemoveNegativeCache(bucket)
- }
+ // Capture any Object Lock configuration error from within the callback
+ // The mkdir callback doesn't support returning errors, so we capture it here
+ var objectLockSetupError error
- // Check for x-amz-bucket-object-lock-enabled header (S3 standard compliance)
- if objectLockHeaderValue := r.Header.Get(s3_constants.AmzBucketObjectLockEnabled); strings.EqualFold(objectLockHeaderValue, "true") {
- glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s due to x-amz-bucket-object-lock-enabled header", bucket)
+ // Create the folder for bucket with all settings atomically
+ // This ensures Object Lock configuration is set in the same CreateEntry call,
+ // preventing race conditions where the bucket exists without Object Lock enabled
+ if err := s3a.mkdir(s3a.option.BucketsPath, bucket, func(entry *filer_pb.Entry) {
+ // Set bucket owner
+ setBucketOwner(r)(entry)
+
+ // Set Object Lock configuration atomically during bucket creation
+ if objectLockEnabled {
+ glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s atomically", bucket)
+
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
- // Atomically update the configuration of the specified bucket. See the updateBucketConfig
- // function definition for detailed documentation on parameters and behavior.
- errCode := s3a.updateBucketConfig(bucket, func(bucketConfig *BucketConfig) error {
// Enable versioning (required for Object Lock)
- bucketConfig.Versioning = s3_constants.VersioningEnabled
+ entry.Extended[s3_constants.ExtVersioningKey] = []byte(s3_constants.VersioningEnabled)
- // Create basic Object Lock configuration (enabled without default retention)
+ // Create and store Object Lock configuration
objectLockConfig := &ObjectLockConfiguration{
ObjectLockEnabled: s3_constants.ObjectLockEnabled,
}
+ if err := StoreObjectLockConfigurationInExtended(entry, objectLockConfig); err != nil {
+ glog.Errorf("PutBucketHandler: failed to store Object Lock config for bucket %s: %v", bucket, err)
+ objectLockSetupError = err
+ // Note: The entry will still be created, but we'll roll it back below
+ } else {
+ glog.V(3).Infof("PutBucketHandler: set ObjectLockConfig for bucket %s: %+v", bucket, objectLockConfig)
+ }
+ }
+ }); err != nil {
+ glog.Errorf("PutBucketHandler mkdir: %v", err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
- // Set the cached Object Lock configuration
- bucketConfig.ObjectLockConfig = objectLockConfig
- glog.V(3).Infof("PutBucketHandler: set ObjectLockConfig for bucket %s: %+v", bucket, objectLockConfig)
-
- return nil
- })
-
- if errCode != s3err.ErrNone {
- glog.Errorf("PutBucketHandler: failed to enable Object Lock for bucket %s: %v", bucket, errCode)
- s3err.WriteErrorResponse(w, r, errCode)
- return
+ // If Object Lock setup failed, roll back the bucket creation
+ // This ensures we don't leave a bucket without the requested Object Lock configuration
+ if objectLockSetupError != nil {
+ glog.Errorf("PutBucketHandler: rolling back bucket %s creation due to Object Lock setup failure: %v", bucket, objectLockSetupError)
+ if deleteErr := s3a.rm(s3a.option.BucketsPath, bucket, true, true); deleteErr != nil {
+ glog.Errorf("PutBucketHandler: failed to rollback bucket %s after Object Lock setup failure: %v", bucket, deleteErr)
}
- glog.V(3).Infof("PutBucketHandler: enabled Object Lock and Versioning for bucket %s", bucket)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+
+ // Remove bucket from negative cache after successful creation
+ if s3a.bucketConfigCache != nil {
+ s3a.bucketConfigCache.RemoveNegativeCache(bucket)
}
w.Header().Set("Location", "/"+bucket)
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go
index 6e373bb4e..da0b78654 100644
--- a/weed/s3api/s3api_object_handlers_delete.go
+++ b/weed/s3api/s3api_object_handlers_delete.go
@@ -129,6 +129,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
})
+
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index f848790de..3da9047ac 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -30,14 +30,14 @@ import (
// Object lock validation errors
var (
- ErrObjectLockVersioningRequired = errors.New("object lock headers can only be used on versioned buckets")
+ ErrObjectLockVersioningRequired = errors.New("object lock headers can only be used on buckets with Object Lock enabled")
ErrInvalidObjectLockMode = errors.New("invalid object lock mode")
ErrInvalidLegalHoldStatus = errors.New("invalid legal hold status")
ErrInvalidRetentionDateFormat = errors.New("invalid retention until date format")
ErrRetentionDateMustBeFuture = errors.New("retain until date must be in the future")
ErrObjectLockModeRequiresDate = errors.New("object lock mode requires retention until date")
ErrRetentionDateRequiresMode = errors.New("retention until date requires object lock mode")
- ErrGovernanceBypassVersioningRequired = errors.New("governance bypass header can only be used on versioned buckets")
+ ErrGovernanceBypassVersioningRequired = errors.New("governance bypass header can only be used on buckets with Object Lock enabled")
ErrInvalidObjectLockDuration = errors.New("object lock duration must be greater than 0 days")
ErrObjectLockDurationExceeded = errors.New("object lock duration exceeds maximum allowed days")
ErrObjectLockConfigurationMissingEnabled = errors.New("object lock configuration must specify ObjectLockEnabled")
@@ -159,8 +159,16 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
+ // Check if Object Lock is enabled for this bucket
+ objectLockEnabled, err := s3a.isObjectLockEnabled(bucket)
+ if err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
+ glog.Errorf("Error checking Object Lock status for bucket %s: %v", bucket, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+
// Validate object lock headers before processing
- if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
+ if err := s3a.validateObjectLockHeaders(r, objectLockEnabled); err != nil {
glog.V(2).Infof("PutObjectHandler: object lock header validation failed for bucket %s, object %s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, mapValidationErrorToS3Error(err))
return
@@ -1311,7 +1319,8 @@ func (s3a *S3ApiServer) applyBucketDefaultRetention(bucket string, entry *filer_
}
// validateObjectLockHeaders validates object lock headers in PUT requests
-func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEnabled bool) error {
+// objectLockEnabled should be true only if the bucket has Object Lock configured
+func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, objectLockEnabled bool) error {
// Extract object lock headers from request
mode := r.Header.Get(s3_constants.AmzObjectLockMode)
retainUntilDateStr := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
@@ -1320,8 +1329,11 @@ func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEna
// Check if any object lock headers are present
hasObjectLockHeaders := mode != "" || retainUntilDateStr != "" || legalHold != ""
- // Object lock headers can only be used on versioned buckets
- if hasObjectLockHeaders && !versioningEnabled {
+ // Object lock headers can only be used on buckets with Object Lock enabled
+ // Per AWS S3: Object Lock can only be enabled at bucket creation, and once enabled,
+ // objects can have retention/legal-hold metadata. Without Object Lock enabled,
+ // these headers must be rejected.
+ if hasObjectLockHeaders && !objectLockEnabled {
return ErrObjectLockVersioningRequired
}
@@ -1362,11 +1374,11 @@ func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEna
}
}
- // Check for governance bypass header - only valid for versioned buckets
+ // Check for governance bypass header - only valid for buckets with Object Lock enabled
bypassGovernance := r.Header.Get("x-amz-bypass-governance-retention") == "true"
- // Governance bypass headers are only valid for versioned buckets (like object lock headers)
- if bypassGovernance && !versioningEnabled {
+ // Governance bypass headers are only valid for buckets with Object Lock enabled
+ if bypassGovernance && !objectLockEnabled {
return ErrGovernanceBypassVersioningRequired
}
diff --git a/weed/s3api/s3api_object_retention.go b/weed/s3api/s3api_object_retention.go
index ef298eb43..328e938c5 100644
--- a/weed/s3api/s3api_object_retention.go
+++ b/weed/s3api/s3api_object_retention.go
@@ -586,10 +586,26 @@ func (s3a *S3ApiServer) evaluateGovernanceBypassRequest(r *http.Request, bucket,
// enforceObjectLockProtections enforces object lock protections for operations
func (s3a *S3ApiServer) enforceObjectLockProtections(request *http.Request, bucket, object, versionId string, governanceBypassAllowed bool) error {
+ // Quick check: if bucket doesn't have Object Lock enabled, skip the expensive entry lookup
+ // This optimization avoids a filer gRPC call for every DELETE operation on buckets without Object Lock
+ objectLockEnabled, err := s3a.isObjectLockEnabled(bucket)
+ if err != nil {
+ if errors.Is(err, filer_pb.ErrNotFound) {
+ // Bucket does not exist, so no protections to enforce
+ return nil
+ }
+ // For other errors, we can't determine lock status, so we should fail.
+ glog.Errorf("enforceObjectLockProtections: failed to check object lock for bucket %s: %v", bucket, err)
+ return err
+ }
+ if !objectLockEnabled {
+ // Object Lock is not enabled on this bucket, no protections to enforce
+ return nil
+ }
+
// Get the object entry to check both retention and legal hold
// For delete operations without versionId, we need to check the latest version
var entry *filer_pb.Entry
- var err error
if versionId != "" {
// Check specific version