aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/filer_multipart.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/filer_multipart.go')
-rw-r--r--weed/s3api/filer_multipart.go100
1 files changed, 88 insertions, 12 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index d517c188b..05d167333 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -21,6 +21,8 @@ import (
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+ "net/http"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -36,7 +38,7 @@ type InitiateMultipartUploadResult struct {
s3.CreateMultipartUploadOutput
}
-func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
+func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("createMultipartUpload input %v", input)
@@ -55,6 +57,13 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
if input.ContentType != nil {
entry.Attributes.Mime = *input.ContentType
}
+
+ // Extract and store object lock metadata from request headers
+ // This ensures object lock settings from create_multipart_upload are preserved
+ if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
+ glog.Errorf("createMultipartUpload: failed to extract object lock metadata: %v", err)
+ // Don't fail the upload - this matches AWS behavior for invalid metadata
+ }
}); err != nil {
glog.Errorf("NewMultipartUpload error: %v", err)
return nil, s3err.ErrInternalError
@@ -72,8 +81,15 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
}
type CompleteMultipartUploadResult struct {
- XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
- s3.CompleteMultipartUploadOutput
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
+ Location *string `xml:"Location,omitempty"`
+ Bucket *string `xml:"Bucket,omitempty"`
+ Key *string `xml:"Key,omitempty"`
+ ETag *string `xml:"ETag,omitempty"`
+ // VersionId is NOT included in XML body - it should only be in x-amz-version-id HTTP header
+
+ // Store the VersionId internally for setting HTTP header, but don't marshal to XML
+ VersionId *string `xml:"-"`
}
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
@@ -110,12 +126,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
return &CompleteMultipartUploadResult{
- CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
- Bucket: input.Bucket,
- ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
- Key: objectKey(input.Key),
- },
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Bucket: input.Bucket,
+ ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
+ Key: objectKey(input.Key),
}, s3err.ErrNone
}
}
@@ -247,13 +261,75 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, s3err.ErrInternalError
}
- output = &CompleteMultipartUploadResult{
- CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
+ // Check if versioning is enabled for this bucket
+ versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket)
+ if vErr == nil && versioningEnabled {
+ // For versioned buckets, create a version and return the version ID
+ versionId := generateVersionId()
+ versionFileName := s3a.getVersionFileName(versionId)
+ versionDir := dirName + "/" + entryName + ".versions"
+
+ // Move the completed object to the versions directory
+ err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) {
+ if versionEntry.Extended == nil {
+ versionEntry.Extended = make(map[string][]byte)
+ }
+ versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
+ versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
+ for k, v := range pentry.Extended {
+ if k != "key" {
+ versionEntry.Extended[k] = v
+ }
+ }
+ if pentry.Attributes.Mime != "" {
+ versionEntry.Attributes.Mime = pentry.Attributes.Mime
+ } else if mime != "" {
+ versionEntry.Attributes.Mime = mime
+ }
+ versionEntry.Attributes.FileSize = uint64(offset)
+ })
+
+ if err != nil {
+ glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err)
+ return nil, s3err.ErrInternalError
+ }
+
+ // Update the .versions directory metadata to indicate this is the latest version
+ err = s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName)
+ if err != nil {
+ glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err)
+ return nil, s3err.ErrInternalError
+ }
+
+ // Create a delete marker for the main object (latest version)
+ err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) {
+ if mainEntry.Extended == nil {
+ mainEntry.Extended = make(map[string][]byte)
+ }
+ mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
+ mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker
+ })
+
+ if err != nil {
+ glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err)
+ return nil, s3err.ErrInternalError
+ }
+
+ output = &CompleteMultipartUploadResult{
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Bucket: input.Bucket,
+ ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
+ Key: objectKey(input.Key),
+ VersionId: aws.String(versionId),
+ }
+ } else {
+ // For non-versioned buckets, return response without VersionId
+ output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
- },
+ }
}
for _, deleteEntry := range deleteEntries {