diff options
Diffstat (limited to 'weed/s3api/filer_multipart.go')
| -rw-r--r-- | weed/s3api/filer_multipart.go | 161 |
1 files changed, 105 insertions, 56 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index d3bde66ee..f882592c1 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -1,9 +1,9 @@ package s3api import ( - "context" "encoding/xml" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" "path/filepath" "strconv" "strings" @@ -11,10 +11,11 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/google/uuid" + + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/google/uuid" ) type InitiateMultipartUploadResult struct { @@ -22,18 +23,21 @@ type InitiateMultipartUploadResult struct { s3.CreateMultipartUploadOutput } -func (s3a *S3ApiServer) createMultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code ErrorCode) { +func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) { + + glog.V(2).Infof("createMultipartUpload input %v", input) + uploadId, _ := uuid.NewRandom() uploadIdString := uploadId.String() - if err := s3a.mkdir(ctx, s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) { + if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } entry.Extended["key"] = []byte(*input.Key) }); err != nil { glog.Errorf("NewMultipartUpload error: %v", err) - return nil, ErrInternalError + return nil, s3err.ErrInternalError } output = &InitiateMultipartUploadResult{ @@ -52,14 +56,16 @@ type CompleteMultipartUploadResult struct { s3.CompleteMultipartUploadOutput } -func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code ErrorCode) { +func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { + + glog.V(2).Infof("completeMultipartUpload input %v", input) uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId - entries, err := s3a.list(ctx, uploadDirectory, "", "", false, 0) - if err != nil { - glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) - return nil, ErrNoSuchUpload + entries, _, err := s3a.list(uploadDirectory, "", "", false, 0) + if err != nil || len(entries) == 0 { + glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) + return nil, s3err.ErrNoSuchUpload } var finalParts []*filer_pb.FileChunk @@ -69,11 +75,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.C if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { for _, chunk := range entry.Chunks { p := &filer_pb.FileChunk{ - FileId: chunk.GetFileIdString(), - Offset: offset, - Size: chunk.Size, - Mtime: chunk.Mtime, - ETag: chunk.ETag, + FileId: chunk.GetFileIdString(), + Offset: offset, + Size: chunk.Size, + Mtime: chunk.Mtime, + CipherKey: chunk.CipherKey, + ETag: chunk.ETag, } finalParts = append(finalParts, p) offset += int64(chunk.Size) @@ -96,78 +103,103 @@ func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.C dirName = dirName[:len(dirName)-1] } - err = s3a.mkFile(ctx, dirName, entryName, finalParts) + err = s3a.mkFile(dirName, entryName, finalParts) if err != nil { glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err) - return nil, ErrInternalError + return nil, s3err.ErrInternalError } output = &CompleteMultipartUploadResult{ CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer, dirName, entryName)), Bucket: input.Bucket, - ETag: aws.String("\"" + filer2.ETag(finalParts) + "\""), + ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), }, } - if err = s3a.rm(ctx, s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, false, true); err != nil { + if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil { glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err) } return } -func (s3a *S3ApiServer) abortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code ErrorCode) { +func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) { + + glog.V(2).Infof("abortMultipartUpload input %v", input) - exists, err := s3a.exists(ctx, s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true) + exists, err := s3a.exists(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true) if err != nil { glog.V(1).Infof("bucket %s abort upload %s: %v", *input.Bucket, *input.UploadId, err) - return nil, ErrNoSuchUpload + return nil, s3err.ErrNoSuchUpload } if exists { - err = s3a.rm(ctx, s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true, true) + err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true) } if err != nil { glog.V(1).Infof("bucket %s remove upload %s: %v", *input.Bucket, *input.UploadId, err) - return nil, ErrInternalError + return nil, s3err.ErrInternalError } - return &s3.AbortMultipartUploadOutput{}, ErrNone + return &s3.AbortMultipartUploadOutput{}, s3err.ErrNone } type ListMultipartUploadsResult struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult"` - s3.ListMultipartUploadsOutput + + // copied from s3.ListMultipartUploadsOutput, the Uploads is not converting to <Upload></Upload> + Bucket *string `type:"string"` + Delimiter *string `type:"string"` + EncodingType *string `type:"string" enum:"EncodingType"` + IsTruncated *bool `type:"boolean"` + KeyMarker *string `type:"string"` + MaxUploads *int64 `type:"integer"` + NextKeyMarker *string `type:"string"` + NextUploadIdMarker *string `type:"string"` + Prefix *string `type:"string"` + UploadIdMarker *string `type:"string"` + Upload []*s3.MultipartUpload `locationName:"Upload" type:"list" flattened:"true"` } -func (s3a *S3ApiServer) listMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code ErrorCode) { +func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code s3err.ErrorCode) { + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html + + glog.V(2).Infof("listMultipartUploads input %v", input) output = &ListMultipartUploadsResult{ - ListMultipartUploadsOutput: s3.ListMultipartUploadsOutput{ - Bucket: input.Bucket, - Delimiter: input.Delimiter, - EncodingType: input.EncodingType, - KeyMarker: input.KeyMarker, - MaxUploads: input.MaxUploads, - Prefix: input.Prefix, - }, + Bucket: input.Bucket, + Delimiter: input.Delimiter, + EncodingType: input.EncodingType, + KeyMarker: input.KeyMarker, + MaxUploads: input.MaxUploads, + Prefix: input.Prefix, } - entries, err := s3a.list(ctx, s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads)) + entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, uint32(*input.MaxUploads)) if err != nil { glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err) return } + output.IsTruncated = aws.Bool(!isLast) for _, entry := range entries { if entry.Extended != nil { - key := entry.Extended["key"] - output.Uploads = append(output.Uploads, &s3.MultipartUpload{ - Key: objectKey(aws.String(string(key))), + key := string(entry.Extended["key"]) + if *input.KeyMarker != "" && *input.KeyMarker != key { + continue + } + if *input.Prefix != "" && !strings.HasPrefix(key, *input.Prefix) { + continue + } + output.Upload = append(output.Upload, &s3.MultipartUpload{ + Key: objectKey(aws.String(key)), UploadId: aws.String(entry.Name), }) + if !isLast { + output.NextUploadIdMarker = aws.String(entry.Name) + } } } @@ -176,27 +208,41 @@ func (s3a *S3ApiServer) listMultipartUploads(ctx context.Context, input *s3.List type ListPartsResult struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult"` - s3.ListPartsOutput + + // copied from s3.ListPartsOutput, the Parts is not converting to <Part></Part> + Bucket *string `type:"string"` + IsTruncated *bool `type:"boolean"` + Key *string `min:"1" type:"string"` + MaxParts *int64 `type:"integer"` + NextPartNumberMarker *int64 `type:"integer"` + PartNumberMarker *int64 `type:"integer"` + Part []*s3.Part `locationName:"Part" type:"list" flattened:"true"` + StorageClass *string `type:"string" enum:"StorageClass"` + UploadId *string `type:"string"` } -func (s3a *S3ApiServer) listObjectParts(ctx context.Context, input *s3.ListPartsInput) (output *ListPartsResult, code ErrorCode) { +func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code s3err.ErrorCode) { + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html + + glog.V(2).Infof("listObjectParts input %v", input) + output = &ListPartsResult{ - ListPartsOutput: s3.ListPartsOutput{ - Bucket: input.Bucket, - Key: objectKey(input.Key), - UploadId: input.UploadId, - MaxParts: input.MaxParts, // the maximum number of parts to return. - PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive - }, + Bucket: input.Bucket, + Key: objectKey(input.Key), + UploadId: input.UploadId, + MaxParts: input.MaxParts, // the maximum number of parts to return. + PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive + StorageClass: aws.String("STANDARD"), } - entries, err := s3a.list(ctx, s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, - "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, int(*input.MaxParts)) + entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts)) if err != nil { glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err) - return nil, ErrNoSuchUpload + return nil, s3err.ErrNoSuchUpload } + output.IsTruncated = aws.Bool(!isLast) + for _, entry := range entries { if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { partNumberString := entry.Name[:len(entry.Name)-len(".part")] @@ -205,12 +251,15 @@ func (s3a *S3ApiServer) listObjectParts(ctx context.Context, input *s3.ListParts glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err) continue } - output.Parts = append(output.Parts, &s3.Part{ + output.Part = append(output.Part, &s3.Part{ PartNumber: aws.Int64(int64(partNumber)), - LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0)), - Size: aws.Int64(int64(filer2.TotalSize(entry.Chunks))), - ETag: aws.String("\"" + filer2.ETag(entry.Chunks) + "\""), + LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()), + Size: aws.Int64(int64(filer.FileSize(entry))), + ETag: aws.String("\"" + filer.ETag(entry) + "\""), }) + if !isLast { + output.NextPartNumberMarker = aws.Int64(int64(partNumber)) + } } } |
