aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/filer_multipart.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/filer_multipart.go')
-rw-r--r--weed/s3api/filer_multipart.go73
1 files changed, 66 insertions, 7 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index d517c188b..c6f1a481c 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -247,13 +247,72 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, s3err.ErrInternalError
}
- output = &CompleteMultipartUploadResult{
- CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
- Bucket: input.Bucket,
- ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
- Key: objectKey(input.Key),
- },
+ // Check if versioning is enabled for this bucket
+ versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket)
+ if vErr == nil && versioningEnabled {
+ // For versioned buckets, create a version and return the version ID
+ versionId := generateVersionId()
+ versionFileName := s3a.getVersionFileName(versionId)
+ versionDir := dirName + "/" + entryName + ".versions"
+
+ // Move the completed object to the versions directory
+ err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) {
+ if versionEntry.Extended == nil {
+ versionEntry.Extended = make(map[string][]byte)
+ }
+ versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
+ versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
+ for k, v := range pentry.Extended {
+ if k != "key" {
+ versionEntry.Extended[k] = v
+ }
+ }
+ if pentry.Attributes.Mime != "" {
+ versionEntry.Attributes.Mime = pentry.Attributes.Mime
+ } else if mime != "" {
+ versionEntry.Attributes.Mime = mime
+ }
+ versionEntry.Attributes.FileSize = uint64(offset)
+ })
+
+ if err != nil {
+ glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err)
+ return nil, s3err.ErrInternalError
+ }
+
+ // Create a delete marker for the main object (latest version)
+ err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) {
+ if mainEntry.Extended == nil {
+ mainEntry.Extended = make(map[string][]byte)
+ }
+ mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
+ mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker
+ })
+
+ if err != nil {
+ glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err)
+ return nil, s3err.ErrInternalError
+ }
+
+ output = &CompleteMultipartUploadResult{
+ CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Bucket: input.Bucket,
+ ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
+ Key: objectKey(input.Key),
+ VersionId: aws.String(versionId),
+ },
+ }
+ } else {
+ // For non-versioned buckets, return response without VersionId
+ output = &CompleteMultipartUploadResult{
+ CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Bucket: input.Bucket,
+ ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
+ Key: objectKey(input.Key),
+ },
+ }
}
for _, deleteEntry := range deleteEntries {