diff options
Diffstat (limited to 'weed/s3api/filer_multipart.go')
| -rw-r--r-- | weed/s3api/filer_multipart.go | 100 |
1 files changed, 88 insertions, 12 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index d517c188b..05d167333 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -21,6 +21,8 @@ import ( "github.com/google/uuid" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "net/http" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -36,7 +38,7 @@ type InitiateMultipartUploadResult struct { s3.CreateMultipartUploadOutput } -func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) { +func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) { glog.V(2).Infof("createMultipartUpload input %v", input) @@ -55,6 +57,13 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp if input.ContentType != nil { entry.Attributes.Mime = *input.ContentType } + + // Extract and store object lock metadata from request headers + // This ensures object lock settings from create_multipart_upload are preserved + if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil { + glog.Errorf("createMultipartUpload: failed to extract object lock metadata: %v", err) + // Don't fail the upload - this matches AWS behavior for invalid metadata + } }); err != nil { glog.Errorf("NewMultipartUpload error: %v", err) return nil, s3err.ErrInternalError @@ -72,8 +81,15 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp } type CompleteMultipartUploadResult struct { - XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"` - s3.CompleteMultipartUploadOutput + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"` + Location *string `xml:"Location,omitempty"` + Bucket *string `xml:"Bucket,omitempty"` + Key *string `xml:"Key,omitempty"` + ETag *string `xml:"ETag,omitempty"` + // VersionId is NOT included in XML body - it should only be in x-amz-version-id HTTP header + + // Store the VersionId internally for setting HTTP header, but don't marshal to XML + VersionId *string `xml:"-"` } func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { @@ -110,12 +126,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) { return &CompleteMultipartUploadResult{ - CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), - Bucket: input.Bucket, - ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), - Key: objectKey(input.Key), - }, + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), + Key: objectKey(input.Key), }, s3err.ErrNone } } @@ -247,13 +261,75 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return nil, s3err.ErrInternalError } - output = &CompleteMultipartUploadResult{ - CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ + // Check if versioning is enabled for this bucket + versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket) + if vErr == nil && versioningEnabled { + // For versioned buckets, create a version and return the version ID + versionId := generateVersionId() + versionFileName := s3a.getVersionFileName(versionId) + versionDir := dirName + "/" + entryName + ".versions" + + // Move the completed object to the versions directory + err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) { + if versionEntry.Extended == nil { + versionEntry.Extended = make(map[string][]byte) + } + versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) + for k, v := range pentry.Extended { + if k != "key" { + versionEntry.Extended[k] = v + } + } + if pentry.Attributes.Mime != "" { + versionEntry.Attributes.Mime = pentry.Attributes.Mime + } else if mime != "" { + versionEntry.Attributes.Mime = mime + } + versionEntry.Attributes.FileSize = uint64(offset) + }) + + if err != nil { + glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err) + return nil, s3err.ErrInternalError + } + + // Update the .versions directory metadata to indicate this is the latest version + err = s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName) + if err != nil { + glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err) + return nil, s3err.ErrInternalError + } + + // Create a delete marker for the main object (latest version) + err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) { + if mainEntry.Extended == nil { + mainEntry.Extended = make(map[string][]byte) + } + mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker + }) + + if err != nil { + glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err) + return nil, s3err.ErrInternalError + } + + output = &CompleteMultipartUploadResult{ + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), + Key: objectKey(input.Key), + VersionId: aws.String(versionId), + } + } else { + // For non-versioned buckets, return response without VersionId + output = &CompleteMultipartUploadResult{ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), - }, + } } for _, deleteEntry := range deleteEntries { |
