diff options
Diffstat (limited to 'weed/s3api/filer_multipart.go')
| -rw-r--r-- | weed/s3api/filer_multipart.go | 98 |
1 files changed, 61 insertions, 37 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 05d167333..c7b2400f5 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -238,32 +238,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa } entryName, dirName := s3a.getEntryNameAndDir(input) - err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) { - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) - for k, v := range pentry.Extended { - if k != "key" { - entry.Extended[k] = v - } - } - if pentry.Attributes.Mime != "" { - entry.Attributes.Mime = pentry.Attributes.Mime - } else if mime != "" { - entry.Attributes.Mime = mime - } - entry.Attributes.FileSize = uint64(offset) - }) - if err != nil { - glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err) - return nil, s3err.ErrInternalError - } - - // Check if versioning is enabled for this bucket - versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket) - if vErr == nil && versioningEnabled { + // Check if versioning is configured for this bucket BEFORE creating any files + versioningState, vErr := s3a.getVersioningState(*input.Bucket) + if vErr == nil && versioningState == s3_constants.VersioningEnabled { // For versioned buckets, create a version and return the version ID versionId := generateVersionId() versionFileName := s3a.getVersionFileName(versionId) @@ -301,28 +279,74 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return nil, s3err.ErrInternalError } - // Create a delete marker for the main object (latest version) - err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) { - if mainEntry.Extended == nil { - mainEntry.Extended = make(map[string][]byte) + // For versioned buckets, don't create a main object file - all content is stored in .versions directory + // The latest version information is tracked in the .versions directory metadata + + output = &CompleteMultipartUploadResult{ + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + Key: objectKey(input.Key), + VersionId: aws.String(versionId), + } + } else if vErr == nil && versioningState == s3_constants.VersioningSuspended { + // For suspended versioning, add "null" version ID metadata and return "null" version ID + err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) { + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") + for k, v := range pentry.Extended { + if k != "key" { + entry.Extended[k] = v + } + } + if pentry.Attributes.Mime != "" { + entry.Attributes.Mime = pentry.Attributes.Mime + } else if mime != "" { + entry.Attributes.Mime = mime } - mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) - mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker + entry.Attributes.FileSize = uint64(offset) }) if err != nil { - glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err) + glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err) return nil, s3err.ErrInternalError } + // Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), - Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), - Key: objectKey(input.Key), - VersionId: aws.String(versionId), + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + Key: objectKey(input.Key), + // VersionId field intentionally omitted for suspended versioning } } else { + // For non-versioned buckets, create main object file + err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) { + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) + for k, v := range pentry.Extended { + if k != "key" { + entry.Extended[k] = v + } + } + if pentry.Attributes.Mime != "" { + entry.Attributes.Mime = pentry.Attributes.Mime + } else if mime != "" { + entry.Attributes.Mime = mime + } + entry.Attributes.FileSize = uint64(offset) + }) + + if err != nil { + glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err) + return nil, s3err.ErrInternalError + } + // For non-versioned buckets, return response without VersionId output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), |
