diff options
Diffstat (limited to 'weed/s3api/filer_multipart.go')
| -rw-r--r-- | weed/s3api/filer_multipart.go | 73 |
1 files changed, 66 insertions, 7 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index d517c188b..c6f1a481c 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -247,13 +247,72 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return nil, s3err.ErrInternalError } - output = &CompleteMultipartUploadResult{ - CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), - Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), - Key: objectKey(input.Key), - }, + // Check if versioning is enabled for this bucket + versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket) + if vErr == nil && versioningEnabled { + // For versioned buckets, create a version and return the version ID + versionId := generateVersionId() + versionFileName := s3a.getVersionFileName(versionId) + versionDir := dirName + "/" + entryName + ".versions" + + // Move the completed object to the versions directory + err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) { + if versionEntry.Extended == nil { + versionEntry.Extended = make(map[string][]byte) + } + versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) + for k, v := range pentry.Extended { + if k != "key" { + versionEntry.Extended[k] = v + } + } + if pentry.Attributes.Mime != "" { + versionEntry.Attributes.Mime = pentry.Attributes.Mime + } else if mime != "" { + versionEntry.Attributes.Mime = mime + } + versionEntry.Attributes.FileSize = uint64(offset) + }) + + if err != nil { + glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err) + return nil, s3err.ErrInternalError + } + + // Create a delete marker for the main object (latest version) + err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) { + if mainEntry.Extended == nil { + mainEntry.Extended = make(map[string][]byte) + } + mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker + }) + + if err != nil { + glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err) + return nil, s3err.ErrInternalError + } + + output = &CompleteMultipartUploadResult{ + CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + Key: objectKey(input.Key), + VersionId: aws.String(versionId), + }, + } + } else { + // For non-versioned buckets, return response without VersionId + output = &CompleteMultipartUploadResult{ + CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + Key: objectKey(input.Key), + }, + } } for _, deleteEntry := range deleteEntries { |
