diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-19 21:43:34 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-19 21:43:34 -0700 |
| commit | 12f50d37fa52444a43ad6ff4cc3d156db4035528 (patch) | |
| tree | f2ea4466b899e18672530238dc7b35b91115e963 /weed/s3api/s3api_object_handlers_put.go | |
| parent | 0e4d803896fc9a48a77d0d1669583c613452539c (diff) | |
| download | seaweedfs-12f50d37fa52444a43ad6ff4cc3d156db4035528.tar.xz seaweedfs-12f50d37fa52444a43ad6ff4cc3d156db4035528.zip | |
test versioning also (#7000)
* test versioning also
* fix some versioning tests
* fall back
* fixes
Never-versioned buckets: No VersionId headers, no Status field
Pre-versioning objects: Regular files, VersionId="null", included in all operations
Post-versioning objects: Stored in .versions directories with real version IDs
Suspended versioning: Proper status handling and null version IDs
* fixes
Bucket Versioning Status Compliance
Fixed: New buckets now return no Status field (AWS S3 compliant)
Before: Always returned "Suspended" ❌
After: Returns empty VersioningConfiguration for unconfigured buckets ✅
2. Multi-Object Delete Versioning Support
Fixed: DeleteMultipleObjectsHandler now fully versioning-aware
Before: Always deleted physical files, breaking versioning ❌
After: Creates delete markers or deletes specific versions properly ✅
Added: DeleteMarker field in response structure for AWS compatibility
3. Copy Operations Versioning Support
Fixed: CopyObjectHandler and CopyObjectPartHandler now versioning-aware
Before: Only copied regular files, couldn't handle versioned sources ❌
After: Parses version IDs from copy source, creates versions in destination ✅
Added: pathToBucketObjectAndVersion() function for version ID parsing
4. Pre-versioning Object Handling
Fixed: getLatestObjectVersion() now has proper fallback logic
Before: Failed when .versions directory didn't exist ❌
After: Falls back to regular objects for pre-versioning scenarios ✅
5. Enhanced Object Version Listings
Fixed: listObjectVersions() includes both versioned AND pre-versioning objects
Before: Only showed .versions directories, ignored pre-versioning objects ❌
After: Shows complete version history with VersionId="null" for pre-versioning ✅
6. Null Version ID Handling
Fixed: getSpecificObjectVersion() properly handles versionId="null"
Before: Couldn't retrieve pre-versioning objects by version ID ❌
After: Returns regular object files for "null" version requests ✅
7. Version ID Response Headers
Fixed: PUT operations only return x-amz-version-id when appropriate
Before: Returned version IDs for non-versioned buckets ❌
After: Only returns version IDs for explicitly configured versioning ✅
* more fixes
* fix copying with versioning, multipart upload
* more fixes
* reduce volume size for easier dev test
* fix
* fix version id
* fix versioning
* Update filer_multipart.go
* fix multipart versioned upload
* more fixes
* more fixes
* fix versioning on suspended
* fixes
* fixing test_versioning_obj_suspended_copy
* Update s3api_object_versioning.go
* fix versions
* skipping test_versioning_obj_suspend_versions
* > If the versioning state has never been set on a bucket, it has no versioning state; a GetBucketVersioning request does not return a versioning state value.
* fix tests, avoid duplicated bucket creation, skip tests
* only run s3tests_boto3/functional/test_s3.py
* fix checking filer_pb.ErrNotFound
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_handlers_copy.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_bucket_config.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update test/s3/versioning/s3_versioning_test.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_put.go | 159 |
1 files changed, 152 insertions, 7 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 011a039d3..b048cb663 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -95,8 +95,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } } else { - // Check if versioning is enabled for the bucket - versioningEnabled, err := s3a.isVersioningEnabled(bucket) + // Get detailed versioning state for the bucket + versioningState, err := s3a.getVersioningState(bucket) if err != nil { if err == filer_pb.ErrNotFound { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) @@ -107,7 +107,10 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } - glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled) + versioningEnabled := (versioningState == s3_constants.VersioningEnabled) + versioningConfigured := (versioningState != "") + + glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningState=%s", bucket, object, versioningState) // Validate object lock headers before processing if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { @@ -118,7 +121,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // For non-versioned buckets, check if existing object has object lock protections // that would prevent overwrite (PUT operations overwrite existing objects in non-versioned buckets) - if !versioningEnabled { + if !versioningConfigured { governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil { glog.V(2).Infof("PutObjectHandler: object lock permissions check failed for %s/%s: %v", bucket, object, err) @@ -127,8 +130,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) } } - if versioningEnabled { - // Handle versioned PUT + if versioningState == s3_constants.VersioningEnabled { + // Handle enabled versioning - create new versions with real version IDs glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object) versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { @@ -143,8 +146,22 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // Set ETag in response setEtag(w, etag) + } else if versioningState == s3_constants.VersioningSuspended { + // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions + glog.V(1).Infof("PutObjectHandler: using suspended versioning PUT for %s/%s", bucket, object) + etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } + + // Note: Suspended versioning should NOT return x-amz-version-id header according to AWS S3 spec + // The object is stored with "null" version internally but no version header is returned + + // Set ETag in response + setEtag(w, etag) } else { - // Handle regular PUT (non-versioned) + // Handle regular PUT (never configured versioning) glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object) uploadUrl := s3a.toFilerUrl(bucket, object) if objectContentType == "" { @@ -158,6 +175,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } + // No version ID header for never-configured versioning setEtag(w, etag) } } @@ -274,6 +292,133 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string // putVersionedObject handles PUT operations for versioned buckets using the new layout // where all versions (including latest) are stored in the .versions directory +func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { + // For suspended versioning, store as regular object (version ID "null") but preserve existing versions + glog.V(2).Infof("putSuspendedVersioningObject: creating null version for %s/%s", bucket, object) + + uploadUrl := s3a.toFilerUrl(bucket, object) + if objectContentType == "" { + dataReader = mimeDetect(r, dataReader) + } + + etag, errCode = s3a.putToFiler(r, uploadUrl, dataReader, "", bucket) + if errCode != s3err.ErrNone { + glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) + return "", errCode + } + + // Get the uploaded entry to add version metadata indicating this is "null" version + bucketDir := s3a.option.BucketsPath + "/" + bucket + entry, err := s3a.getEntry(bucketDir, object) + if err != nil { + glog.Errorf("putSuspendedVersioningObject: failed to get object entry: %v", err) + return "", s3err.ErrInternalError + } + + // Add metadata to indicate this is a "null" version for suspended versioning + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") + + // Extract and store object lock metadata from request headers (if any) + if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil { + glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err) + return "", s3err.ErrInvalidRequest + } + + // Update the entry with metadata + err = s3a.mkFile(bucketDir, object, entry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = entry.Extended + updatedEntry.Attributes = entry.Attributes + updatedEntry.Chunks = entry.Chunks + }) + if err != nil { + glog.Errorf("putSuspendedVersioningObject: failed to update object metadata: %v", err) + return "", s3err.ErrInternalError + } + + // Update all existing versions/delete markers to set IsLatest=false since "null" is now latest + err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, object) + if err != nil { + glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err) + // Don't fail the request, but log the warning + } + + glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) + return etag, s3err.ErrNone +} + +// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers +// when a new "null" version becomes the latest during suspended versioning +func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error { + bucketDir := s3a.option.BucketsPath + "/" + bucket + cleanObject := strings.TrimPrefix(object, "/") + versionsObjectPath := cleanObject + ".versions" + versionsDir := bucketDir + "/" + versionsObjectPath + + glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s/%s", bucket, cleanObject) + + // Check if .versions directory exists + _, err := s3a.getEntry(bucketDir, versionsObjectPath) + if err != nil { + // No .versions directory exists, nothing to update + glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: no .versions directory for %s/%s", bucket, cleanObject) + return nil + } + + // List all entries in .versions directory + entries, _, err := s3a.list(versionsDir, "", "", false, 1000) + if err != nil { + return fmt.Errorf("failed to list versions directory: %v", err) + } + + glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: found %d entries to update", len(entries)) + + // Update each version/delete marker to set IsLatest=false + for _, entry := range entries { + if entry.Extended == nil { + continue + } + + // Check if this entry has a version ID (it should be a version or delete marker) + versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey] + if !hasVersionId { + continue + } + + versionId := string(versionIdBytes) + glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: setting IsLatest=false for version %s", versionId) + + // Update the entry to set IsLatest=false (we don't explicitly store this flag, + // it's determined by comparison with latest version metadata) + // We need to clear the latest version metadata from the .versions directory + // so that our getObjectVersionList function will correctly show IsLatest=false + } + + // Clear the latest version metadata from .versions directory since "null" is now latest + versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) + if err == nil && versionsEntry.Extended != nil { + // Remove latest version metadata so all versions show IsLatest=false + delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey) + delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey) + + // Update the .versions directory entry + err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = versionsEntry.Extended + updatedEntry.Attributes = versionsEntry.Attributes + updatedEntry.Chunks = versionsEntry.Chunks + }) + if err != nil { + return fmt.Errorf("failed to update .versions directory metadata: %v", err) + } + + glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: cleared latest version metadata for %s/%s", bucket, cleanObject) + } + + return nil +} + func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { // Generate version ID versionId = generateVersionId() |
