aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/filer_multipart.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/filer_multipart.go')
-rw-r--r--weed/s3api/filer_multipart.go98
1 files changed, 61 insertions, 37 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 05d167333..c7b2400f5 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -238,32 +238,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
}
entryName, dirName := s3a.getEntryNameAndDir(input)
- err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
- for k, v := range pentry.Extended {
- if k != "key" {
- entry.Extended[k] = v
- }
- }
- if pentry.Attributes.Mime != "" {
- entry.Attributes.Mime = pentry.Attributes.Mime
- } else if mime != "" {
- entry.Attributes.Mime = mime
- }
- entry.Attributes.FileSize = uint64(offset)
- })
- if err != nil {
- glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
- return nil, s3err.ErrInternalError
- }
-
- // Check if versioning is enabled for this bucket
- versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket)
- if vErr == nil && versioningEnabled {
+ // Check if versioning is configured for this bucket BEFORE creating any files
+ versioningState, vErr := s3a.getVersioningState(*input.Bucket)
+ if vErr == nil && versioningState == s3_constants.VersioningEnabled {
// For versioned buckets, create a version and return the version ID
versionId := generateVersionId()
versionFileName := s3a.getVersionFileName(versionId)
@@ -301,28 +279,74 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, s3err.ErrInternalError
}
- // Create a delete marker for the main object (latest version)
- err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) {
- if mainEntry.Extended == nil {
- mainEntry.Extended = make(map[string][]byte)
+ // For versioned buckets, don't create a main object file - all content is stored in .versions directory
+ // The latest version information is tracked in the .versions directory metadata
+
+ output = &CompleteMultipartUploadResult{
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Bucket: input.Bucket,
+ ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
+ Key: objectKey(input.Key),
+ VersionId: aws.String(versionId),
+ }
+ } else if vErr == nil && versioningState == s3_constants.VersioningSuspended {
+ // For suspended versioning, add "null" version ID metadata and return "null" version ID
+ err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
+ for k, v := range pentry.Extended {
+ if k != "key" {
+ entry.Extended[k] = v
+ }
+ }
+ if pentry.Attributes.Mime != "" {
+ entry.Attributes.Mime = pentry.Attributes.Mime
+ } else if mime != "" {
+ entry.Attributes.Mime = mime
}
- mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
- mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker
+ entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
- glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err)
+ glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err)
return nil, s3err.ErrInternalError
}
+ // Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
- Bucket: input.Bucket,
- ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
- Key: objectKey(input.Key),
- VersionId: aws.String(versionId),
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Bucket: input.Bucket,
+ ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
+ Key: objectKey(input.Key),
+ // VersionId field intentionally omitted for suspended versioning
}
} else {
+ // For non-versioned buckets, create main object file
+ err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
+ for k, v := range pentry.Extended {
+ if k != "key" {
+ entry.Extended[k] = v
+ }
+ }
+ if pentry.Attributes.Mime != "" {
+ entry.Attributes.Mime = pentry.Attributes.Mime
+ } else if mime != "" {
+ entry.Attributes.Mime = mime
+ }
+ entry.Attributes.FileSize = uint64(offset)
+ })
+
+ if err != nil {
+ glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
+ return nil, s3err.ErrInternalError
+ }
+
// For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),