diff options
| author | guosj <515878133@qq.com> | 2022-04-19 09:25:32 +0800 |
|---|---|---|
| committer | guosj <515878133@qq.com> | 2022-04-19 09:25:32 +0800 |
| commit | 82ee31965dd7a1ad2d348c7e9dadb254744bf9b0 (patch) | |
| tree | 593eb933dffc877010c761b2c55ec6c73875e9a3 /weed/s3api | |
| parent | 5c9a3bb8cf68ed99acb53dd548c92b54744d7fd7 (diff) | |
| parent | e6ebafc094dc0ce0e3b0a68d7735f52a544bc479 (diff) | |
| download | seaweedfs-82ee31965dd7a1ad2d348c7e9dadb254744bf9b0.tar.xz seaweedfs-82ee31965dd7a1ad2d348c7e9dadb254744bf9b0.zip | |
Merge branch 'master' of https://github.com/chrislusf/seaweedfs into chrislusf-master
Diffstat (limited to 'weed/s3api')
| -rw-r--r-- | weed/s3api/filer_multipart.go | 27 | ||||
| -rw-r--r-- | weed/s3api/filer_multipart_test.go | 6 | ||||
| -rw-r--r-- | weed/s3api/s3api_bucket_handlers.go | 20 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 17 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_multipart_handlers.go | 44 | ||||
| -rw-r--r-- | weed/s3api/s3api_server.go | 17 | ||||
| -rw-r--r-- | weed/s3api/s3err/s3api_errors.go | 6 |
7 files changed, 114 insertions, 23 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 5a039382b..659cf4c96 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -1,9 +1,11 @@ package s3api import ( + "encoding/hex" "encoding/xml" "fmt" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "golang.org/x/exp/slices" "path/filepath" "sort" "strconv" @@ -12,7 +14,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" - "github.com/google/uuid" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -28,8 +29,7 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp glog.V(2).Infof("createMultipartUpload input %v", input) - uploadId, _ := uuid.NewRandom() - uploadIdString := uploadId.String() + uploadIdString := s3a.generateUploadID(*input.Key) if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) { if entry.Extended == nil { @@ -68,8 +68,8 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa glog.V(2).Infof("completeMultipartUpload input %v", input) completedParts := parts.Parts - sort.Slice(completedParts, func(i, j int) bool { - return completedParts[i].PartNumber < completedParts[j].PartNumber + slices.SortFunc(completedParts, func(a, b CompletedPart) bool { + return a.PartNumber < b.PartNumber }) uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId @@ -93,10 +93,15 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa for _, entry := range entries { if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - _, found := findByPartNumber(entry.Name, completedParts) + partETag, found := findByPartNumber(entry.Name, completedParts) if !found { continue } + entryETag := hex.EncodeToString(entry.Attributes.GetMd5()) + if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag { + glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) + return nil, s3err.ErrInvalidPart + } for _, chunk := range entry.Chunks { p := &filer_pb.FileChunk{ FileId: chunk.GetFileIdString(), @@ -175,7 +180,15 @@ func findByPartNumber(fileName string, parts []CompletedPart) (etag string, foun if parts[x].PartNumber != partNumber { return } - return parts[x].ETag, true + y := 0 + for i, part := range parts[x:] { + if part.PartNumber == partNumber { + y = i + } else { + break + } + } + return parts[x+y].ETag, true } func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) { diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go index 52425b5b2..fe2b9c0ce 100644 --- a/weed/s3api/filer_multipart_test.go +++ b/weed/s3api/filer_multipart_test.go @@ -62,6 +62,10 @@ func Test_findByPartNumber(t *testing.T) { PartNumber: 1, }, CompletedPart{ + ETag: "lll", + PartNumber: 1, + }, + CompletedPart{ ETag: "yyy", PartNumber: 3, }, @@ -83,7 +87,7 @@ func Test_findByPartNumber(t *testing.T) { "0001.part", parts, }, - "xxx", + "lll", true, }, { diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 3d35e5216..7de1d5ebb 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -3,6 +3,7 @@ package s3api import ( "context" "encoding/xml" + "errors" "fmt" "math" "net/http" @@ -134,6 +135,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } + w.Header().Set("Location", "/" + bucket) writeSuccessResponseEmpty(w, r) } @@ -148,6 +150,15 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque } err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + if !s3a.option.AllowDeleteBucketNotEmpty { + entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 1) + if err != nil { + return fmt.Errorf("failed to list bucket %s: %v", bucket, err) + } + if len(entries) > 0 { + return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code) + } + } // delete collection deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ @@ -162,6 +173,15 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque return nil }) + if err != nil { + s3ErrorCode := s3err.ErrInternalError + if err.Error() == s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code { + s3ErrorCode = s3err.ErrBucketNotEmpty + } + s3err.WriteErrorResponse(w, r, s3ErrorCode) + return + } + err = s3a.rm(s3a.option.BucketsPath, bucket, false, true) if err != nil { diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 6bcf2266f..3d26d395e 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -8,10 +8,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util/mem" + "golang.org/x/exp/slices" "io" "net/http" "net/url" - "sort" "strings" "time" @@ -27,6 +27,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +const ( + deleteMultipleObjectsLimmit = 1000 +) + func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) @@ -59,7 +63,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) if r.Header.Get("Expires") != "" { if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedExpires) return } } @@ -217,6 +221,11 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h return } + if len(deleteObjects.Objects) > deleteMultipleObjectsLimmit { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxDeleteObjects) + return + } + var deletedObjects []ObjectIdentifier var deleteErrors []DeleteError var auditLog *s3err.AccessLog @@ -281,8 +290,8 @@ func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerCli for dir, _ := range directoriesWithDeletion { allDirs = append(allDirs, dir) } - sort.Slice(allDirs, func(i, j int) bool { - return len(allDirs[i]) > len(allDirs[j]) + slices.SortFunc(allDirs, func(a, b string) bool { + return len(a) > len(b) }) newDirectoriesWithDeletion = make(map[string]int) for _, dir := range allDirs { diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 35bc174c8..d2fa21c2e 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -2,6 +2,7 @@ package s3api import ( "encoding/xml" + "crypto/sha1" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" @@ -70,6 +71,11 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r // Get upload id. uploadID, _, _, _ := getObjectResources(r.URL.Query()) + err := s3a.checkUploadId(object, uploadID) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) + return + } response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: aws.String(bucket), @@ -94,6 +100,11 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht // Get upload id. uploadID, _, _, _ := getObjectResources(r.URL.Query()) + err := s3a.checkUploadId(object, uploadID) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) + return + } response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(bucket), @@ -165,6 +176,12 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re return } + err := s3a.checkUploadId(object, uploadID) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) + return + } + response, errCode := s3a.listObjectParts(&s3.ListPartsInput{ Bucket: aws.String(bucket), Key: objectKey(aws.String(object)), @@ -186,11 +203,11 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re // PutObjectPartHandler - Put an object part in a multipart upload. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { - bucket, _ := xhttp.GetBucketAndObject(r) + bucket, object := xhttp.GetBucketAndObject(r) uploadID := r.URL.Query().Get("uploadId") - exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true) - if !exists { + err := s3a.checkUploadId(object, uploadID) + if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) return } @@ -250,6 +267,27 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string { return fmt.Sprintf("%s/%s/.uploads", s3a.option.BucketsPath, bucket) } +// Generate uploadID hash string from object +func (s3a *S3ApiServer) generateUploadID(object string) string { + if strings.HasPrefix(object, "/") { + object = object[1:] + } + h := sha1.New() + h.Write([]byte(object)) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +//Check object name and uploadID when processing multipart uploading +func (s3a *S3ApiServer) checkUploadId(object string, id string) error { + + hash := s3a.generateUploadID(object) + if hash != id { + glog.Errorf("object %s and uploadID %s are not matched", object, id) + return fmt.Errorf("object %s and uploadID %s are not matched", object, id) + } + return nil +} + // Parse bucket url queries for ?uploads func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) { prefix = values.Get("prefix") diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index fe069595d..561edd924 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -19,14 +19,15 @@ import ( ) type S3ApiServerOption struct { - Filer pb.ServerAddress - Port int - Config string - DomainName string - BucketsPath string - GrpcDialOption grpc.DialOption - AllowEmptyFolder bool - LocalFilerSocket *string + Filer pb.ServerAddress + Port int + Config string + DomainName string + BucketsPath string + GrpcDialOption grpc.DialOption + AllowEmptyFolder bool + AllowDeleteBucketNotEmpty bool + LocalFilerSocket *string } type S3ApiServer struct { diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go index f4a83d979..52803f398 100644 --- a/weed/s3api/s3err/s3api_errors.go +++ b/weed/s3api/s3err/s3api_errors.go @@ -61,6 +61,7 @@ const ( ErrInvalidMaxKeys ErrInvalidMaxUploads ErrInvalidMaxParts + ErrInvalidMaxDeleteObjects ErrInvalidPartNumberMarker ErrInvalidPart ErrInternalError @@ -157,6 +158,11 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "Argument max-parts must be an integer between 0 and 2147483647", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidMaxDeleteObjects: { + Code: "InvalidArgument", + Description: "Argument objects can contain a list of up to 1000 keys", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidPartNumberMarker: { Code: "InvalidArgument", Description: "Argument partNumberMarker must be an integer.", |
