aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_copy.go')
-rw-r--r--weed/s3api/s3api_object_handlers_copy.go213
1 files changed, 176 insertions, 37 deletions
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go
index 9ce8a6377..888b38e94 100644
--- a/weed/s3api/s3api_object_handlers_copy.go
+++ b/weed/s3api/s3api_object_handlers_copy.go
@@ -38,9 +38,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
}
- srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
+ srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
- glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject)
+ glog.V(3).Infof("CopyObjectHandler %s %s (version: %s) => %s %s", srcBucket, srcObject, srcVersionId, dstBucket, dstObject)
replaceMeta, replaceTagging := replaceDirective(r.Header)
@@ -76,9 +76,41 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
- srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
- dir, name := srcPath.DirAndName()
- entry, err := s3a.getEntry(dir, name)
+
+ // Get detailed versioning state for source bucket
+ srcVersioningState, err := s3a.getVersioningState(srcBucket)
+ if err != nil {
+ glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
+ return
+ }
+
+ // Get the source entry with version awareness based on versioning state
+ var entry *filer_pb.Entry
+ if srcVersionId != "" {
+ // Specific version requested - always use version-aware retrieval
+ entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
+ } else if srcVersioningState == s3_constants.VersioningEnabled {
+ // Versioning enabled - get latest version from .versions directory
+ entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
+ } else if srcVersioningState == s3_constants.VersioningSuspended {
+ // Versioning suspended - current object is stored as regular file ("null" version)
+ // Try regular file first, fall back to latest version if needed
+ srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
+ dir, name := srcPath.DirAndName()
+ entry, err = s3a.getEntry(dir, name)
+ if err != nil {
+ // If regular file doesn't exist, try latest version as fallback
+ glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version")
+ entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
+ }
+ } else {
+ // No versioning configured - use regular retrieval
+ srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
+ dir, name := srcPath.DirAndName()
+ entry, err = s3a.getEntry(dir, name)
+ }
+
if err != nil || entry.IsDirectory {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
@@ -138,43 +170,108 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
dstEntry.Chunks = dstChunks
}
- // Save the new entry
- dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
- dstDir, dstName := dstPath.DirAndName()
+ // Check if destination bucket has versioning configured
+ dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket)
+ if err != nil {
+ glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+
+ var dstVersionId string
+ var etag string
+
+ if dstVersioningConfigured {
+ // For versioned destination, create a new version
+ dstVersionId = generateVersionId()
+ glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject)
- // Check if destination exists and remove it first (S3 copy overwrites)
- if exists, _ := s3a.exists(dstDir, dstName, false); exists {
- if err := s3a.rm(dstDir, dstName, false, false); err != nil {
+ // Add version metadata to the entry
+ if dstEntry.Extended == nil {
+ dstEntry.Extended = make(map[string][]byte)
+ }
+ dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId)
+
+ // Calculate ETag for versioning
+ filerEntry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)),
+ Attr: filer.Attr{
+ FileSize: dstEntry.Attributes.FileSize,
+ Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
+ Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
+ Mime: dstEntry.Attributes.Mime,
+ },
+ Chunks: dstEntry.Chunks,
+ }
+ etag = filer.ETagEntry(filerEntry)
+ if !strings.HasPrefix(etag, "\"") {
+ etag = "\"" + etag + "\""
+ }
+ dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
+
+ // Create version file
+ versionFileName := s3a.getVersionFileName(dstVersionId)
+ versionObjectPath := dstObject + ".versions/" + versionFileName
+ bucketDir := s3a.option.BucketsPath + "/" + dstBucket
+
+ if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) {
+ entry.Attributes = dstEntry.Attributes
+ entry.Extended = dstEntry.Extended
+ }); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
- }
- // Create the new file
- if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
- entry.Attributes = dstEntry.Attributes
- entry.Extended = dstEntry.Extended
- }); err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
+ // Update the .versions directory metadata
+ err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName)
+ if err != nil {
+ glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
- // Convert filer_pb.Entry to filer.Entry for ETag calculation
- filerEntry := &filer.Entry{
- FullPath: dstPath,
- Attr: filer.Attr{
- FileSize: dstEntry.Attributes.FileSize,
- Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
- Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
- Mime: dstEntry.Attributes.Mime,
- },
- Chunks: dstEntry.Chunks,
+ // Set version ID in response header
+ w.Header().Set("x-amz-version-id", dstVersionId)
+ } else {
+ // For non-versioned destination, use regular copy
+ dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
+ dstDir, dstName := dstPath.DirAndName()
+
+ // Check if destination exists and remove it first (S3 copy overwrites)
+ if exists, _ := s3a.exists(dstDir, dstName, false); exists {
+ if err := s3a.rm(dstDir, dstName, false, false); err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ }
+
+ // Create the new file
+ if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
+ entry.Attributes = dstEntry.Attributes
+ entry.Extended = dstEntry.Extended
+ }); err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+
+ // Calculate ETag
+ filerEntry := &filer.Entry{
+ FullPath: dstPath,
+ Attr: filer.Attr{
+ FileSize: dstEntry.Attributes.FileSize,
+ Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
+ Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
+ Mime: dstEntry.Attributes.Mime,
+ },
+ Chunks: dstEntry.Chunks,
+ }
+ etag = filer.ETagEntry(filerEntry)
}
- setEtag(w, filer.ETagEntry(filerEntry))
+ setEtag(w, etag)
response := CopyObjectResult{
- ETag: filer.ETagEntry(filerEntry),
+ ETag: etag,
LastModified: time.Now().UTC(),
}
@@ -191,6 +288,18 @@ func pathToBucketAndObject(path string) (bucket, object string) {
return parts[0], "/"
}
+func pathToBucketObjectAndVersion(path string) (bucket, object, versionId string) {
+ // Parse versionId from query string if present
+ // Format: /bucket/object?versionId=version-id
+ if idx := strings.Index(path, "?versionId="); idx != -1 {
+ versionId = path[idx+len("?versionId="):] // dynamically calculate length
+ path = path[:idx]
+ }
+
+ bucket, object = pathToBucketAndObject(path)
+ return bucket, object, versionId
+}
+
type CopyPartResult struct {
LastModified time.Time `xml:"LastModified"`
ETag string `xml:"ETag"`
@@ -208,7 +317,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
}
- srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
+ srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
@@ -239,10 +348,40 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
return
}
- // Get source entry
- srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
- dir, name := srcPath.DirAndName()
- entry, err := s3a.getEntry(dir, name)
+ // Get detailed versioning state for source bucket
+ srcVersioningState, err := s3a.getVersioningState(srcBucket)
+ if err != nil {
+ glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
+ return
+ }
+
+ // Get the source entry with version awareness based on versioning state
+ var entry *filer_pb.Entry
+ if srcVersionId != "" {
+ // Specific version requested - always use version-aware retrieval
+ entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
+ } else if srcVersioningState == s3_constants.VersioningEnabled {
+ // Versioning enabled - get latest version from .versions directory
+ entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
+ } else if srcVersioningState == s3_constants.VersioningSuspended {
+ // Versioning suspended - current object is stored as regular file ("null" version)
+ // Try regular file first, fall back to latest version if needed
+ srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
+ dir, name := srcPath.DirAndName()
+ entry, err = s3a.getEntry(dir, name)
+ if err != nil {
+ // If regular file doesn't exist, try latest version as fallback
+ glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version")
+ entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
+ }
+ } else {
+ // No versioning configured - use regular retrieval
+ srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
+ dir, name := srcPath.DirAndName()
+ entry, err = s3a.getEntry(dir, name)
+ }
+
if err != nil || entry.IsDirectory {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return