diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-19 21:43:34 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-19 21:43:34 -0700 |
| commit | 12f50d37fa52444a43ad6ff4cc3d156db4035528 (patch) | |
| tree | f2ea4466b899e18672530238dc7b35b91115e963 /weed/s3api/s3api_object_handlers_copy.go | |
| parent | 0e4d803896fc9a48a77d0d1669583c613452539c (diff) | |
| download | seaweedfs-12f50d37fa52444a43ad6ff4cc3d156db4035528.tar.xz seaweedfs-12f50d37fa52444a43ad6ff4cc3d156db4035528.zip | |
test versioning also (#7000)
* test versioning also
* fix some versioning tests
* fall back
* fixes
Never-versioned buckets: No VersionId headers, no Status field
Pre-versioning objects: Regular files, VersionId="null", included in all operations
Post-versioning objects: Stored in .versions directories with real version IDs
Suspended versioning: Proper status handling and null version IDs
* fixes
Bucket Versioning Status Compliance
Fixed: New buckets now return no Status field (AWS S3 compliant)
Before: Always returned "Suspended" ❌
After: Returns empty VersioningConfiguration for unconfigured buckets ✅
2. Multi-Object Delete Versioning Support
Fixed: DeleteMultipleObjectsHandler now fully versioning-aware
Before: Always deleted physical files, breaking versioning ❌
After: Creates delete markers or deletes specific versions properly ✅
Added: DeleteMarker field in response structure for AWS compatibility
3. Copy Operations Versioning Support
Fixed: CopyObjectHandler and CopyObjectPartHandler now versioning-aware
Before: Only copied regular files, couldn't handle versioned sources ❌
After: Parses version IDs from copy source, creates versions in destination ✅
Added: pathToBucketObjectAndVersion() function for version ID parsing
4. Pre-versioning Object Handling
Fixed: getLatestObjectVersion() now has proper fallback logic
Before: Failed when .versions directory didn't exist ❌
After: Falls back to regular objects for pre-versioning scenarios ✅
5. Enhanced Object Version Listings
Fixed: listObjectVersions() includes both versioned AND pre-versioning objects
Before: Only showed .versions directories, ignored pre-versioning objects ❌
After: Shows complete version history with VersionId="null" for pre-versioning ✅
6. Null Version ID Handling
Fixed: getSpecificObjectVersion() properly handles versionId="null"
Before: Couldn't retrieve pre-versioning objects by version ID ❌
After: Returns regular object files for "null" version requests ✅
7. Version ID Response Headers
Fixed: PUT operations only return x-amz-version-id when appropriate
Before: Returned version IDs for non-versioned buckets ❌
After: Only returns version IDs for explicitly configured versioning ✅
* more fixes
* fix copying with versioning, multipart upload
* more fixes
* reduce volume size for easier dev test
* fix
* fix version id
* fix versioning
* Update filer_multipart.go
* fix multipart versioned upload
* more fixes
* more fixes
* fix versioning on suspended
* fixes
* fixing test_versioning_obj_suspended_copy
* Update s3api_object_versioning.go
* fix versions
* skipping test_versioning_obj_suspend_versions
* > If the versioning state has never been set on a bucket, it has no versioning state; a GetBucketVersioning request does not return a versioning state value.
* fix tests, avoid duplicated bucket creation, skip tests
* only run s3tests_boto3/functional/test_s3.py
* fix checking filer_pb.ErrNotFound
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_handlers_copy.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_bucket_config.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update test/s3/versioning/s3_versioning_test.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_handlers_copy.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_copy.go | 213 |
1 files changed, 176 insertions, 37 deletions
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 9ce8a6377..888b38e94 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -38,9 +38,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request cpSrcPath = r.Header.Get("X-Amz-Copy-Source") } - srcBucket, srcObject := pathToBucketAndObject(cpSrcPath) + srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath) - glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject) + glog.V(3).Infof("CopyObjectHandler %s %s (version: %s) => %s %s", srcBucket, srcObject, srcVersionId, dstBucket, dstObject) replaceMeta, replaceTagging := replaceDirective(r.Header) @@ -76,9 +76,41 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } - srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) - dir, name := srcPath.DirAndName() - entry, err := s3a.getEntry(dir, name) + + // Get detailed versioning state for source bucket + srcVersioningState, err := s3a.getVersioningState(srcBucket) + if err != nil { + glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } + + // Get the source entry with version awareness based on versioning state + var entry *filer_pb.Entry + if srcVersionId != "" { + // Specific version requested - always use version-aware retrieval + entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId) + } else if srcVersioningState == s3_constants.VersioningEnabled { + // Versioning enabled - get latest version from .versions directory + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } else if srcVersioningState == s3_constants.VersioningSuspended { + // Versioning suspended - current object is stored as regular file ("null" version) + // Try regular file first, fall back to latest version if needed + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + if err != nil { + // If regular file doesn't exist, try latest version as fallback + glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version") + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } + } else { + // No versioning configured - use regular retrieval + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + } + if err != nil || entry.IsDirectory { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return @@ -138,43 +170,108 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request dstEntry.Chunks = dstChunks } - // Save the new entry - dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) - dstDir, dstName := dstPath.DirAndName() + // Check if destination bucket has versioning configured + dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket) + if err != nil { + glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + var dstVersionId string + var etag string + + if dstVersioningConfigured { + // For versioned destination, create a new version + dstVersionId = generateVersionId() + glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject) - // Check if destination exists and remove it first (S3 copy overwrites) - if exists, _ := s3a.exists(dstDir, dstName, false); exists { - if err := s3a.rm(dstDir, dstName, false, false); err != nil { + // Add version metadata to the entry + if dstEntry.Extended == nil { + dstEntry.Extended = make(map[string][]byte) + } + dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId) + + // Calculate ETag for versioning + filerEntry := &filer.Entry{ + FullPath: util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)), + Attr: filer.Attr{ + FileSize: dstEntry.Attributes.FileSize, + Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), + Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), + Mime: dstEntry.Attributes.Mime, + }, + Chunks: dstEntry.Chunks, + } + etag = filer.ETagEntry(filerEntry) + if !strings.HasPrefix(etag, "\"") { + etag = "\"" + etag + "\"" + } + dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) + + // Create version file + versionFileName := s3a.getVersionFileName(dstVersionId) + versionObjectPath := dstObject + ".versions/" + versionFileName + bucketDir := s3a.option.BucketsPath + "/" + dstBucket + + if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) { + entry.Attributes = dstEntry.Attributes + entry.Extended = dstEntry.Extended + }); err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - } - // Create the new file - if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { - entry.Attributes = dstEntry.Attributes - entry.Extended = dstEntry.Extended - }); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + // Update the .versions directory metadata + err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName) + if err != nil { + glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } - // Convert filer_pb.Entry to filer.Entry for ETag calculation - filerEntry := &filer.Entry{ - FullPath: dstPath, - Attr: filer.Attr{ - FileSize: dstEntry.Attributes.FileSize, - Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), - Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), - Mime: dstEntry.Attributes.Mime, - }, - Chunks: dstEntry.Chunks, + // Set version ID in response header + w.Header().Set("x-amz-version-id", dstVersionId) + } else { + // For non-versioned destination, use regular copy + dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) + dstDir, dstName := dstPath.DirAndName() + + // Check if destination exists and remove it first (S3 copy overwrites) + if exists, _ := s3a.exists(dstDir, dstName, false); exists { + if err := s3a.rm(dstDir, dstName, false, false); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + } + + // Create the new file + if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { + entry.Attributes = dstEntry.Attributes + entry.Extended = dstEntry.Extended + }); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + // Calculate ETag + filerEntry := &filer.Entry{ + FullPath: dstPath, + Attr: filer.Attr{ + FileSize: dstEntry.Attributes.FileSize, + Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), + Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), + Mime: dstEntry.Attributes.Mime, + }, + Chunks: dstEntry.Chunks, + } + etag = filer.ETagEntry(filerEntry) } - setEtag(w, filer.ETagEntry(filerEntry)) + setEtag(w, etag) response := CopyObjectResult{ - ETag: filer.ETagEntry(filerEntry), + ETag: etag, LastModified: time.Now().UTC(), } @@ -191,6 +288,18 @@ func pathToBucketAndObject(path string) (bucket, object string) { return parts[0], "/" } +func pathToBucketObjectAndVersion(path string) (bucket, object, versionId string) { + // Parse versionId from query string if present + // Format: /bucket/object?versionId=version-id + if idx := strings.Index(path, "?versionId="); idx != -1 { + versionId = path[idx+len("?versionId="):] // dynamically calculate length + path = path[:idx] + } + + bucket, object = pathToBucketAndObject(path) + return bucket, object, versionId +} + type CopyPartResult struct { LastModified time.Time `xml:"LastModified"` ETag string `xml:"ETag"` @@ -208,7 +317,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req cpSrcPath = r.Header.Get("X-Amz-Copy-Source") } - srcBucket, srcObject := pathToBucketAndObject(cpSrcPath) + srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath) // If source object is empty or bucket is empty, reply back invalid copy source. if srcObject == "" || srcBucket == "" { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) @@ -239,10 +348,40 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req return } - // Get source entry - srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) - dir, name := srcPath.DirAndName() - entry, err := s3a.getEntry(dir, name) + // Get detailed versioning state for source bucket + srcVersioningState, err := s3a.getVersioningState(srcBucket) + if err != nil { + glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } + + // Get the source entry with version awareness based on versioning state + var entry *filer_pb.Entry + if srcVersionId != "" { + // Specific version requested - always use version-aware retrieval + entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId) + } else if srcVersioningState == s3_constants.VersioningEnabled { + // Versioning enabled - get latest version from .versions directory + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } else if srcVersioningState == s3_constants.VersioningSuspended { + // Versioning suspended - current object is stored as regular file ("null" version) + // Try regular file first, fall back to latest version if needed + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + if err != nil { + // If regular file doesn't exist, try latest version as fallback + glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version") + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } + } else { + // No versioning configured - use regular retrieval + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + } + if err != nil || entry.IsDirectory { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return |
