diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_copy.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_copy.go | 213 |
1 files changed, 176 insertions, 37 deletions
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 9ce8a6377..888b38e94 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -38,9 +38,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request cpSrcPath = r.Header.Get("X-Amz-Copy-Source") } - srcBucket, srcObject := pathToBucketAndObject(cpSrcPath) + srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath) - glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject) + glog.V(3).Infof("CopyObjectHandler %s %s (version: %s) => %s %s", srcBucket, srcObject, srcVersionId, dstBucket, dstObject) replaceMeta, replaceTagging := replaceDirective(r.Header) @@ -76,9 +76,41 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } - srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) - dir, name := srcPath.DirAndName() - entry, err := s3a.getEntry(dir, name) + + // Get detailed versioning state for source bucket + srcVersioningState, err := s3a.getVersioningState(srcBucket) + if err != nil { + glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } + + // Get the source entry with version awareness based on versioning state + var entry *filer_pb.Entry + if srcVersionId != "" { + // Specific version requested - always use version-aware retrieval + entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId) + } else if srcVersioningState == s3_constants.VersioningEnabled { + // Versioning enabled - get latest version from .versions directory + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } else if srcVersioningState == s3_constants.VersioningSuspended { + // Versioning suspended - current object is stored as regular file ("null" version) + // Try regular file first, fall back to latest version if needed + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + if err != nil { + // If regular file doesn't exist, try latest version as fallback + glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version") + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } + } else { + // No versioning configured - use regular retrieval + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + } + if err != nil || entry.IsDirectory { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return @@ -138,43 +170,108 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request dstEntry.Chunks = dstChunks } - // Save the new entry - dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) - dstDir, dstName := dstPath.DirAndName() + // Check if destination bucket has versioning configured + dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket) + if err != nil { + glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + var dstVersionId string + var etag string + + if dstVersioningConfigured { + // For versioned destination, create a new version + dstVersionId = generateVersionId() + glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject) - // Check if destination exists and remove it first (S3 copy overwrites) - if exists, _ := s3a.exists(dstDir, dstName, false); exists { - if err := s3a.rm(dstDir, dstName, false, false); err != nil { + // Add version metadata to the entry + if dstEntry.Extended == nil { + dstEntry.Extended = make(map[string][]byte) + } + dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId) + + // Calculate ETag for versioning + filerEntry := &filer.Entry{ + FullPath: util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)), + Attr: filer.Attr{ + FileSize: dstEntry.Attributes.FileSize, + Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), + Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), + Mime: dstEntry.Attributes.Mime, + }, + Chunks: dstEntry.Chunks, + } + etag = filer.ETagEntry(filerEntry) + if !strings.HasPrefix(etag, "\"") { + etag = "\"" + etag + "\"" + } + dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) + + // Create version file + versionFileName := s3a.getVersionFileName(dstVersionId) + versionObjectPath := dstObject + ".versions/" + versionFileName + bucketDir := s3a.option.BucketsPath + "/" + dstBucket + + if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) { + entry.Attributes = dstEntry.Attributes + entry.Extended = dstEntry.Extended + }); err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - } - // Create the new file - if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { - entry.Attributes = dstEntry.Attributes - entry.Extended = dstEntry.Extended - }); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + // Update the .versions directory metadata + err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName) + if err != nil { + glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } - // Convert filer_pb.Entry to filer.Entry for ETag calculation - filerEntry := &filer.Entry{ - FullPath: dstPath, - Attr: filer.Attr{ - FileSize: dstEntry.Attributes.FileSize, - Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), - Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), - Mime: dstEntry.Attributes.Mime, - }, - Chunks: dstEntry.Chunks, + // Set version ID in response header + w.Header().Set("x-amz-version-id", dstVersionId) + } else { + // For non-versioned destination, use regular copy + dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) + dstDir, dstName := dstPath.DirAndName() + + // Check if destination exists and remove it first (S3 copy overwrites) + if exists, _ := s3a.exists(dstDir, dstName, false); exists { + if err := s3a.rm(dstDir, dstName, false, false); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + } + + // Create the new file + if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { + entry.Attributes = dstEntry.Attributes + entry.Extended = dstEntry.Extended + }); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + // Calculate ETag + filerEntry := &filer.Entry{ + FullPath: dstPath, + Attr: filer.Attr{ + FileSize: dstEntry.Attributes.FileSize, + Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), + Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), + Mime: dstEntry.Attributes.Mime, + }, + Chunks: dstEntry.Chunks, + } + etag = filer.ETagEntry(filerEntry) } - setEtag(w, filer.ETagEntry(filerEntry)) + setEtag(w, etag) response := CopyObjectResult{ - ETag: filer.ETagEntry(filerEntry), + ETag: etag, LastModified: time.Now().UTC(), } @@ -191,6 +288,18 @@ func pathToBucketAndObject(path string) (bucket, object string) { return parts[0], "/" } +func pathToBucketObjectAndVersion(path string) (bucket, object, versionId string) { + // Parse versionId from query string if present + // Format: /bucket/object?versionId=version-id + if idx := strings.Index(path, "?versionId="); idx != -1 { + versionId = path[idx+len("?versionId="):] // dynamically calculate length + path = path[:idx] + } + + bucket, object = pathToBucketAndObject(path) + return bucket, object, versionId +} + type CopyPartResult struct { LastModified time.Time `xml:"LastModified"` ETag string `xml:"ETag"` @@ -208,7 +317,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req cpSrcPath = r.Header.Get("X-Amz-Copy-Source") } - srcBucket, srcObject := pathToBucketAndObject(cpSrcPath) + srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath) // If source object is empty or bucket is empty, reply back invalid copy source. if srcObject == "" || srcBucket == "" { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) @@ -239,10 +348,40 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req return } - // Get source entry - srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) - dir, name := srcPath.DirAndName() - entry, err := s3a.getEntry(dir, name) + // Get detailed versioning state for source bucket + srcVersioningState, err := s3a.getVersioningState(srcBucket) + if err != nil { + glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } + + // Get the source entry with version awareness based on versioning state + var entry *filer_pb.Entry + if srcVersionId != "" { + // Specific version requested - always use version-aware retrieval + entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId) + } else if srcVersioningState == s3_constants.VersioningEnabled { + // Versioning enabled - get latest version from .versions directory + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } else if srcVersioningState == s3_constants.VersioningSuspended { + // Versioning suspended - current object is stored as regular file ("null" version) + // Try regular file first, fall back to latest version if needed + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + if err != nil { + // If regular file doesn't exist, try latest version as fallback + glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version") + entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) + } + } else { + // No versioning configured - use regular retrieval + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + entry, err = s3a.getEntry(dir, name) + } + if err != nil || entry.IsDirectory { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return |
