diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-09-09 16:25:43 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-09-09 16:25:43 -0700 |
| commit | 164091c269b1ff54c40329947cdf1dad37a527dc (patch) | |
| tree | 71bac8b171a7b6909293219f3ae6b13fff91c561 /weed/s3api | |
| parent | 9b3bf0e46c65ab8dfa980750cb1b805f08383df9 (diff) | |
| download | seaweedfs-164091c269b1ff54c40329947cdf1dad37a527dc.tar.xz seaweedfs-164091c269b1ff54c40329947cdf1dad37a527dc.zip | |
add s3 multipart upload
Diffstat (limited to 'weed/s3api')
| -rw-r--r-- | weed/s3api/filer_multipart.go | 113 | ||||
| -rw-r--r-- | weed/s3api/filer_util.go | 44 | ||||
| -rw-r--r-- | weed/s3api/s3api_bucket_handlers.go | 2 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_multipart_handlers.go | 2 | ||||
| -rw-r--r-- | weed/s3api/s3api_objects_list_handlers.go | 4 |
5 files changed, 152 insertions, 13 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index cd5b2e118..d9cafae26 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -1,11 +1,18 @@ package s3api import ( + "fmt" + "time" + "strings" + "strconv" + "path/filepath" + "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/aws" "github.com/satori/go.uuid" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/filer2" ) func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *s3.CreateMultipartUploadOutput, code ErrorCode) { @@ -32,19 +39,76 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp } func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *s3.CompleteMultipartUploadOutput, code ErrorCode) { + + uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId + + entries, err := s3a.list(uploadDirectory, "", "", false, 0) + if err != nil { + glog.Errorf("completeMultipartUpload %s *s error: %v", *input.Bucket, *input.UploadId, err) + return nil, ErrNoSuchUpload + } + + var finalParts []*filer_pb.FileChunk + var offset int64 + + for _, entry := range entries { + if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { + for _, chunk := range entry.Chunks { + finalParts = append(finalParts, &filer_pb.FileChunk{ + FileId: chunk.FileId, + Offset: offset, + Size: chunk.Size, + Mtime: chunk.Mtime, + ETag: chunk.ETag, + }) + offset += int64(chunk.Size) + } + } + } + + entryName := filepath.Base(*input.Key) + dirName := filepath.Dir(*input.Key) + if dirName == "." { + dirName = "" + } + dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) + + err = s3a.mkFile(dirName, entryName, finalParts) + + if err != nil { + glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err) + return nil, ErrInternalError + } + + output = &s3.CompleteMultipartUploadOutput{ + Bucket: input.Bucket, + ETag: aws.String("\"" + filer2.ETag(finalParts) + "\""), + Key: input.Key, + } + return } func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code ErrorCode) { - return -} -func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *s3.ListMultipartUploadsOutput, code ErrorCode) { - entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)) + exists, err := s3a.exists(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true) if err != nil { - glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err) + glog.V(1).Infof("bucket %s abort upload %s: %v", *input.Bucket, *input.UploadId, err) return nil, ErrNoSuchUpload } + if exists { + err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true, true) + } + if err != nil { + glog.V(1).Infof("bucket %s remove upload %s: %v", *input.Bucket, *input.UploadId, err) + return nil, ErrInternalError + } + + return &s3.AbortMultipartUploadOutput{}, ErrNone +} + +func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *s3.ListMultipartUploadsOutput, code ErrorCode) { + output = &s3.ListMultipartUploadsOutput{ Bucket: input.Bucket, Delimiter: input.Delimiter, @@ -53,6 +117,13 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput MaxUploads: input.MaxUploads, Prefix: input.Prefix, } + + entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads)) + if err != nil { + glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err) + return + } + for _, entry := range entries { if entry.Extended != nil { key := entry.Extended["key"] @@ -66,5 +137,37 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput } func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *s3.ListPartsOutput, code ErrorCode) { + output = &s3.ListPartsOutput{ + Bucket: input.Bucket, + Key: input.Key, + UploadId: input.UploadId, + MaxParts: input.MaxParts, // the maximum number of parts to return. + PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive + } + + entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, + "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, int(*input.MaxParts)) + if err != nil { + glog.Errorf("listObjectParts %s *s error: %v", *input.Bucket, *input.UploadId, err) + return nil, ErrNoSuchUpload + } + + for _, entry := range entries { + if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { + partNumberString := entry.Name[:len(entry.Name)-len(".part")] + partNumber, err := strconv.Atoi(partNumberString) + if err != nil { + glog.Errorf("listObjectParts %s *s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err) + continue + } + output.Parts = append(output.Parts, &s3.Part{ + PartNumber: aws.Int64(int64(partNumber)), + LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0)), + Size: aws.Int64(int64(filer2.TotalSize(entry.Chunks))), + ETag: aws.String("\"" + filer2.ETag(entry.Chunks) + "\""), + }) + } + } + return } diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index a44305505..f0949389c 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -34,7 +34,7 @@ func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn fun Entry: entry, } - glog.V(1).Infof("create bucket: %v", request) + glog.V(1).Infof("mkdir: %v", request) if _, err := client.CreateEntry(context.Background(), request); err != nil { return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err) } @@ -43,12 +43,46 @@ func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn fun }) } -func (s3a *S3ApiServer) list(parentDirectoryPath string) (entries []*filer_pb.Entry, err error) { +func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error { + return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + entry := &filer_pb.Entry{ + Name: fileName, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0770), + Uid: OS_UID, + Gid: OS_GID, + }, + Chunks: chunks, + } + + request := &filer_pb.CreateEntryRequest{ + Directory: parentDirectoryPath, + Entry: entry, + } + + glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName) + if _, err := client.CreateEntry(context.Background(), request); err != nil { + return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err) + } + + return nil + }) +} + +func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit int) (entries []*filer_pb.Entry, err error) { err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.ListEntriesRequest{ - Directory: s3a.option.BucketsPath, + Directory: s3a.option.BucketsPath, + Prefix: prefix, + StartFromFileName: startFrom, + InclusiveStartFrom: inclusive, + Limit: uint32(limit), } glog.V(4).Infof("read directory: %v", request) @@ -101,10 +135,10 @@ func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isD Name: entryName, } - glog.V(1).Infof("delete entry %v/%v: %v", parentDirectoryPath, entryName, request) + glog.V(1).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request) resp, err := client.LookupDirectoryEntry(ctx, request) if err != nil { - return fmt.Errorf("delete entry %s/%s: %v", parentDirectoryPath, entryName, err) + return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err) } exists = resp.Entry.IsDirectory == isDirectory diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 5257e67b4..df9abd451 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -21,7 +21,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques var response ListAllMyBucketsResponse - entries, err := s3a.list(s3a.option.BucketsPath) + entries, err := s3a.list(s3a.option.BucketsPath, "", "", false, 0) if err != nil { writeErrorResponse(w, ErrInternalError, r.URL) diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 62b702a39..042736856 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -143,6 +143,8 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht return } + // TODO handle encodingType + writeSuccessResponseXML(w, encodeResponse(response)) } diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 721982d0f..e104b4c9e 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -11,10 +11,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/gorilla/mux" + "github.com/chrislusf/seaweedfs/weed/filer2" ) const ( @@ -126,7 +126,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys contents = append(contents, &s3.Object{ Key: aws.String(fmt.Sprintf("%s%s", dir, entry.Name)), LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0)), - ETag: aws.String("\"2345sgfwetrewrt\""), // TODO add etag + ETag: aws.String("\"" + filer2.ETag(entry.Chunks) + "\""), Size: aws.Int64(int64(filer2.TotalSize(entry.Chunks))), Owner: &s3.Owner{ ID: aws.String("bcaf161ca5fb16fd081034f"), |
