diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/s3api | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/s3api')
39 files changed, 2642 insertions, 520 deletions
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 44c3f7aa7..fb23d9ce9 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -2,17 +2,18 @@ package s3api import ( "fmt" + "net/http" + "os" + "strings" + "sync" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" - xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" - "io/ioutil" - "net/http" - "strings" ) type Action string @@ -22,8 +23,11 @@ type Iam interface { } type IdentityAccessManagement struct { - identities []*Identity - domain string + m sync.RWMutex + + identities []*Identity + isAuthEnabled bool + domain string } type Identity struct { @@ -37,6 +41,31 @@ type Credential struct { SecretKey string } +func (action Action) isAdmin() bool { + return strings.HasPrefix(string(action), s3_constants.ACTION_ADMIN) +} + +func (action Action) isOwner(bucket string) bool { + return string(action) == s3_constants.ACTION_ADMIN+":"+bucket +} + +func (action Action) overBucket(bucket string) bool { + return strings.HasSuffix(string(action), ":"+bucket) || strings.HasSuffix(string(action), ":*") +} + +func (action Action) getPermission() Permission { + switch act := strings.Split(string(action), ":")[0]; act { + case s3_constants.ACTION_ADMIN: + return Permission("FULL_CONTROL") + case s3_constants.ACTION_WRITE: + return Permission("WRITE") + case s3_constants.ACTION_READ: + return Permission("READ") + default: + return Permission("") + } +} + func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManagement { iam := &IdentityAccessManagement{ domain: option.DomainName, @@ -55,26 +84,26 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) { var content []byte - err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile) return err }) if err != nil { return fmt.Errorf("read S3 config: %v", err) } - return iam.loadS3ApiConfigurationFromBytes(content) + return iam.LoadS3ApiConfigurationFromBytes(content) } func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error { - content, readErr := ioutil.ReadFile(fileName) + content, readErr := os.ReadFile(fileName) if readErr != nil { glog.Warningf("fail to read %s : %v", fileName, readErr) return fmt.Errorf("fail to read %s : %v", fileName, readErr) } - return iam.loadS3ApiConfigurationFromBytes(content) + return iam.LoadS3ApiConfigurationFromBytes(content) } -func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []byte) error { +func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromBytes(content []byte) error { s3ApiConfiguration := &iam_pb.S3ApiConfiguration{} if err := filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil { glog.Warningf("unmarshal error: %v", err) @@ -105,31 +134,39 @@ func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3Api } identities = append(identities, t) } - + iam.m.Lock() // atomically switch iam.identities = identities + if !iam.isAuthEnabled { // one-directional, no toggling + iam.isAuthEnabled = len(identities) > 0 + } + iam.m.Unlock() return nil } func (iam *IdentityAccessManagement) isEnabled() bool { - - return len(iam.identities) > 0 + return iam.isAuthEnabled } func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identity *Identity, cred *Credential, found bool) { + iam.m.RLock() + defer iam.m.RUnlock() for _, ident := range iam.identities { for _, cred := range ident.Credentials { + // println("checking", ident.Name, cred.AccessKey) if cred.AccessKey == accessKey { return ident, cred, true } } } + glog.V(1).Infof("could not find accessKey %s", accessKey) return nil, nil, false } func (iam *IdentityAccessManagement) lookupAnonymous() (identity *Identity, found bool) { - + iam.m.RLock() + defer iam.m.RUnlock() for _, ident := range iam.identities { if ident.Name == "anonymous" { return ident, true @@ -139,24 +176,26 @@ func (iam *IdentityAccessManagement) lookupAnonymous() (identity *Identity, foun } func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc { - - if !iam.isEnabled() { - return f - } - return func(w http.ResponseWriter, r *http.Request) { + if !iam.isEnabled() { + f(w, r) + return + } + identity, errCode := iam.authRequest(r, action) if errCode == s3err.ErrNone { if identity != nil && identity.Name != "" { - r.Header.Set(xhttp.AmzIdentityId, identity.Name) + r.Header.Set(s3_constants.AmzIdentityId, identity.Name) if identity.isAdmin() { - r.Header.Set(xhttp.AmzIsAdmin, "true") + r.Header.Set(s3_constants.AmzIsAdmin, "true") + } else if _, ok := r.Header[s3_constants.AmzIsAdmin]; ok { + r.Header.Del(s3_constants.AmzIsAdmin) } } f(w, r) return } - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) } } @@ -165,42 +204,53 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) var identity *Identity var s3Err s3err.ErrorCode var found bool + var authType string switch getRequestAuthType(r) { case authTypeStreamingSigned: return identity, s3err.ErrNone case authTypeUnknown: glog.V(3).Infof("unknown auth type") + r.Header.Set(s3_constants.AmzAuthType, "Unknown") return identity, s3err.ErrAccessDenied case authTypePresignedV2, authTypeSignedV2: glog.V(3).Infof("v2 auth type") identity, s3Err = iam.isReqAuthenticatedV2(r) + authType = "SigV2" case authTypeSigned, authTypePresigned: glog.V(3).Infof("v4 auth type") identity, s3Err = iam.reqSignatureV4Verify(r) + authType = "SigV4" case authTypePostPolicy: glog.V(3).Infof("post policy auth type") + r.Header.Set(s3_constants.AmzAuthType, "PostPolicy") return identity, s3err.ErrNone case authTypeJWT: glog.V(3).Infof("jwt auth type") + r.Header.Set(s3_constants.AmzAuthType, "Jwt") return identity, s3err.ErrNotImplemented case authTypeAnonymous: + authType = "Anonymous" identity, found = iam.lookupAnonymous() if !found { + r.Header.Set(s3_constants.AmzAuthType, authType) return identity, s3err.ErrAccessDenied } default: return identity, s3err.ErrNotImplemented } + if len(authType) > 0 { + r.Header.Set(s3_constants.AmzAuthType, authType) + } if s3Err != s3err.ErrNone { return identity, s3Err } glog.V(3).Infof("user name: %v actions: %v, action: %v", identity.Name, identity.Actions, action) - bucket, _ := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) - if !identity.canDo(action, bucket) { + if !identity.canDo(action, bucket, object) { return identity, s3err.ErrAccessDenied } @@ -212,33 +262,45 @@ func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err var identity *Identity var s3Err s3err.ErrorCode var found bool + var authType string switch getRequestAuthType(r) { case authTypeStreamingSigned: return identity, s3err.ErrNone case authTypeUnknown: glog.V(3).Infof("unknown auth type") + r.Header.Set(s3_constants.AmzAuthType, "Unknown") return identity, s3err.ErrAccessDenied case authTypePresignedV2, authTypeSignedV2: glog.V(3).Infof("v2 auth type") identity, s3Err = iam.isReqAuthenticatedV2(r) + authType = "SigV2" case authTypeSigned, authTypePresigned: glog.V(3).Infof("v4 auth type") identity, s3Err = iam.reqSignatureV4Verify(r) + authType = "SigV4" case authTypePostPolicy: glog.V(3).Infof("post policy auth type") + r.Header.Set(s3_constants.AmzAuthType, "PostPolicy") return identity, s3err.ErrNone case authTypeJWT: glog.V(3).Infof("jwt auth type") + r.Header.Set(s3_constants.AmzAuthType, "Jwt") return identity, s3err.ErrNotImplemented case authTypeAnonymous: + authType = "Anonymous" identity, found = iam.lookupAnonymous() if !found { + r.Header.Set(s3_constants.AmzAuthType, authType) return identity, s3err.ErrAccessDenied } default: return identity, s3err.ErrNotImplemented } + if len(authType) > 0 { + r.Header.Set(s3_constants.AmzAuthType, authType) + } + glog.V(3).Infof("auth error: %v", s3Err) if s3Err != s3err.ErrNone { return identity, s3Err @@ -246,7 +308,7 @@ func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err return identity, s3err.ErrNone } -func (identity *Identity) canDo(action Action, bucket string) bool { +func (identity *Identity) canDo(action Action, bucket string, objectKey string) bool { if identity.isAdmin() { return true } @@ -258,15 +320,17 @@ func (identity *Identity) canDo(action Action, bucket string) bool { if bucket == "" { return false } + target := string(action) + ":" + bucket + objectKey + adminTarget := s3_constants.ACTION_ADMIN + ":" + bucket + objectKey limitedByBucket := string(action) + ":" + bucket adminLimitedByBucket := s3_constants.ACTION_ADMIN + ":" + bucket for _, a := range identity.Actions { act := string(a) if strings.HasSuffix(act, "*") { - if strings.HasPrefix(limitedByBucket, act[:len(act)-1]) { + if strings.HasPrefix(target, act[:len(act)-1]) { return true } - if strings.HasPrefix(adminLimitedByBucket, act[:len(act)-1]) { + if strings.HasPrefix(adminTarget, act[:len(act)-1]) { return true } } else { diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 05cce632a..f2bd94f56 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -5,10 +5,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "github.com/chrislusf/seaweedfs/weed/util" ) -func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error { +func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { @@ -22,18 +23,41 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la if message.NewParentPath != "" { dir = message.NewParentPath } - if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile { - if err := s3a.iam.loadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil { - return err - } - glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile) - } + fileName := message.NewEntry.Name + content := message.NewEntry.Content + + _ = s3a.onIamConfigUpdate(dir, fileName, content) + _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content) return nil } - return util.Retry("followIamChanges", func() error { - return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, lastTsNs, 0, processEventFn, true) + util.RetryForever("followIamChanges", func() error { + return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, prefix, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError) + }, func(err error) bool { + glog.V(0).Infof("iam follow metadata changes: %v", err) + return true }) +} +//reload iam config +func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error { + if dir == filer.IamConfigDirecotry && filename == filer.IamIdentityFile { + if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil { + return err + } + glog.V(0).Infof("updated %s/%s", dir, filename) + } + return nil +} + +//reload circuit breaker config +func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error { + if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile { + if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil { + return err + } + glog.V(0).Infof("updated %s/%s", dir, filename) + } + return nil } diff --git a/weed/s3api/auth_credentials_test.go b/weed/s3api/auth_credentials_test.go index 0383ddbcd..4545d13bc 100644 --- a/weed/s3api/auth_credentials_test.go +++ b/weed/s3api/auth_credentials_test.go @@ -2,6 +2,7 @@ package s3api import ( . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/stretchr/testify/assert" "testing" "github.com/golang/protobuf/jsonpb" @@ -67,3 +68,61 @@ func TestIdentityListFileFormat(t *testing.T) { println(text) } + +func TestCanDo(t *testing.T) { + ident1 := &Identity{ + Name: "anything", + Actions: []Action{ + "Write:bucket1/a/b/c/*", + "Write:bucket1/a/b/other", + }, + } + // object specific + assert.Equal(t, true, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt")) + assert.Equal(t, false, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/other/some"), "action without *") + + // bucket specific + ident2 := &Identity{ + Name: "anything", + Actions: []Action{ + "Read:bucket1", + "Write:bucket1/*", + }, + } + assert.Equal(t, true, ident2.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt")) + assert.Equal(t, true, ident2.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt")) + assert.Equal(t, false, ident2.canDo(ACTION_LIST, "bucket1", "/a/b/c/d.txt")) + + // across buckets + ident3 := &Identity{ + Name: "anything", + Actions: []Action{ + "Read", + "Write", + }, + } + assert.Equal(t, true, ident3.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt")) + assert.Equal(t, true, ident3.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt")) + assert.Equal(t, false, ident3.canDo(ACTION_LIST, "bucket1", "/a/b/other/some")) + + // partial buckets + ident4 := &Identity{ + Name: "anything", + Actions: []Action{ + "Read:special_*", + }, + } + assert.Equal(t, true, ident4.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt")) + assert.Equal(t, false, ident4.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt")) + + // admin buckets + ident5 := &Identity{ + Name: "anything", + Actions: []Action{ + "Admin:special_*", + }, + } + assert.Equal(t, true, ident5.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt")) + assert.Equal(t, true, ident5.canDo(ACTION_WRITE, "special_bucket", "/a/b/c/d.txt")) + +} diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go index 0df26e6fc..a49caad06 100644 --- a/weed/s3api/auth_signature_v4.go +++ b/weed/s3api/auth_signature_v4.go @@ -23,8 +23,7 @@ import ( "crypto/sha256" "crypto/subtle" "encoding/hex" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" - "io/ioutil" + "io" "net/http" "net/url" "regexp" @@ -33,6 +32,8 @@ import ( "strings" "time" "unicode/utf8" + + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" ) func (iam *IdentityAccessManagement) reqSignatureV4Verify(r *http.Request) (*Identity, s3err.ErrorCode) { @@ -135,9 +136,9 @@ func (iam *IdentityAccessManagement) doesSignatureMatch(hashedPayload string, r // Get hashed Payload if signV4Values.Credential.scope.service != "s3" && hashedPayload == emptySHA256 && r.Body != nil { - buf, _ := ioutil.ReadAll(r.Body) - r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) - b, _ := ioutil.ReadAll(bytes.NewBuffer(buf)) + buf, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewBuffer(buf)) + b, _ := io.ReadAll(bytes.NewBuffer(buf)) if len(b) != 0 { bodyHash := sha256.Sum256(b) hashedPayload = hex.EncodeToString(bodyHash[:]) @@ -433,7 +434,7 @@ func (iam *IdentityAccessManagement) doesPresignedSignatureMatch(hashedPayload s } } - /// Verify finally if signature is same. + // / Verify finally if signature is same. // Get canonical request. presignedCanonicalReq := getCanonicalRequest(extractedSignedHeaders, hashedPayload, encodedQuery, req.URL.Path, req.Method) diff --git a/weed/s3api/auto_signature_v4_test.go b/weed/s3api/auto_signature_v4_test.go index b47cd5f2d..a58551187 100644 --- a/weed/s3api/auto_signature_v4_test.go +++ b/weed/s3api/auto_signature_v4_test.go @@ -8,9 +8,7 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" "io" - "io/ioutil" "net/http" "net/url" "sort" @@ -19,6 +17,8 @@ import ( "testing" "time" "unicode/utf8" + + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" ) // TestIsRequestPresignedSignatureV4 - Test validates the logic for presign signature verision v4 detection. @@ -86,7 +86,7 @@ func TestIsReqAuthenticated(t *testing.T) { // Validates all testcases. for i, testCase := range testCases { if _, s3Error := iam.reqSignatureV4Verify(testCase.req); s3Error != testCase.s3Error { - ioutil.ReadAll(testCase.req.Body) + io.ReadAll(testCase.req.Body) t.Fatalf("Test %d: Unexpected S3 error: want %d - got %d", i, testCase.s3Error, s3Error) } } @@ -167,7 +167,7 @@ func newTestRequest(method, urlStr string, contentLength int64, body io.ReadSeek case body == nil: hashedPayload = getSHA256Hash([]byte{}) default: - payloadBytes, err := ioutil.ReadAll(body) + payloadBytes, err := io.ReadAll(body) if err != nil { return nil, err } diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go index ec26f693a..2678f312f 100644 --- a/weed/s3api/chunked_reader_v4.go +++ b/weed/s3api/chunked_reader_v4.go @@ -24,6 +24,7 @@ import ( "crypto/sha256" "encoding/hex" "errors" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" "hash" "io" @@ -90,8 +91,8 @@ func (iam *IdentityAccessManagement) calculateSeedSignature(r *http.Request) (cr return nil, "", "", time.Time{}, s3err.ErrInvalidAccessKeyID } - bucket, _ := getBucketAndObject(r) - if !identity.canDo("Write", bucket) { + bucket, object := s3_constants.GetBucketAndObject(r) + if !identity.canDo(s3_constants.ACTION_WRITE, bucket, object) { errCode = s3err.ErrAccessDenied return } diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 2b6707f2e..32b93307a 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -1,17 +1,20 @@ package s3api import ( + "encoding/hex" "encoding/xml" "fmt" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "golang.org/x/exp/slices" + "math" "path/filepath" + "sort" "strconv" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" - "github.com/google/uuid" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -27,8 +30,7 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp glog.V(2).Infof("createMultipartUpload input %v", input) - uploadId, _ := uuid.NewRandom() - uploadIdString := uploadId.String() + uploadIdString := s3a.generateUploadID(*input.Key) if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) { if entry.Extended == nil { @@ -38,6 +40,9 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp for k, v := range input.Metadata { entry.Extended[k] = []byte(*v) } + if input.ContentType != nil { + entry.Attributes.Mime = *input.ContentType + } }); err != nil { glog.Errorf("NewMultipartUpload error: %v", err) return nil, s3err.ErrInternalError @@ -59,13 +64,18 @@ type CompleteMultipartUploadResult struct { s3.CompleteMultipartUploadOutput } -func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { +func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { glog.V(2).Infof("completeMultipartUpload input %v", input) + completedParts := parts.Parts + slices.SortFunc(completedParts, func(a, b CompletedPart) bool { + return a.PartNumber < b.PartNumber + }) + uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId - entries, _, err := s3a.list(uploadDirectory, "", "", false, 0) + entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList) if err != nil || len(entries) == 0 { glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) return nil, s3err.ErrNoSuchUpload @@ -77,11 +87,22 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return nil, s3err.ErrNoSuchUpload } + mime := pentry.Attributes.Mime + var finalParts []*filer_pb.FileChunk var offset int64 for _, entry := range entries { if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { + partETag, found := findByPartNumber(entry.Name, completedParts) + if !found { + continue + } + entryETag := hex.EncodeToString(entry.Attributes.GetMd5()) + if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag { + glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) + return nil, s3err.ErrInvalidPart + } for _, chunk := range entry.Chunks { p := &filer_pb.FileChunk{ FileId: chunk.GetFileIdString(), @@ -121,6 +142,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa entry.Extended[k] = v } } + if pentry.Attributes.Mime != "" { + entry.Attributes.Mime = pentry.Attributes.Mime + } else if mime != "" { + entry.Attributes.Mime = mime + } }) if err != nil { @@ -130,7 +156,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa output = &CompleteMultipartUploadResult{ CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer, dirName, entryName)), + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlPathEscape(dirName), urlPathEscape(entryName))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), @@ -144,6 +170,31 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return } +func findByPartNumber(fileName string, parts []CompletedPart) (etag string, found bool) { + partNumber, formatErr := strconv.Atoi(fileName[:4]) + if formatErr != nil { + return + } + x := sort.Search(len(parts), func(i int) bool { + return parts[i].PartNumber >= partNumber + }) + if x >= len(parts) { + return + } + if parts[x].PartNumber != partNumber { + return + } + y := 0 + for i, part := range parts[x:] { + if part.PartNumber == partNumber { + y = i + } else { + break + } + } + return parts[x+y].ETag, true +} + func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) { glog.V(2).Infof("abortMultipartUpload input %v", input) @@ -195,13 +246,13 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput Prefix: input.Prefix, } - entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, uint32(*input.MaxUploads)) + entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, math.MaxInt32) if err != nil { glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err) return } - output.IsTruncated = aws.Bool(!isLast) + uploadsCount := int64(0) for _, entry := range entries { if entry.Extended != nil { key := string(entry.Extended["key"]) @@ -215,9 +266,12 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput Key: objectKey(aws.String(key)), UploadId: aws.String(entry.Name), }) - if !isLast { - output.NextUploadIdMarker = aws.String(entry.Name) - } + uploadsCount += 1 + } + if uploadsCount >= *input.MaxUploads { + output.IsTruncated = aws.Bool(true) + output.NextUploadIdMarker = aws.String(entry.Name) + break } } @@ -259,6 +313,9 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP return nil, s3err.ErrNoSuchUpload } + // Note: The upload directory is sort of a marker of the existence of an multipart upload request. + // So can not just delete empty upload folders. + output.IsTruncated = aws.Bool(!isLast) for _, entry := range entries { diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go index 9e1d2307b..fe2b9c0ce 100644 --- a/weed/s3api/filer_multipart_test.go +++ b/weed/s3api/filer_multipart_test.go @@ -4,6 +4,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "github.com/stretchr/testify/assert" "testing" "time" ) @@ -48,3 +49,89 @@ func TestListPartsResult(t *testing.T) { } } + +func Test_findByPartNumber(t *testing.T) { + type args struct { + fileName string + parts []CompletedPart + } + + parts := []CompletedPart{ + CompletedPart{ + ETag: "xxx", + PartNumber: 1, + }, + CompletedPart{ + ETag: "lll", + PartNumber: 1, + }, + CompletedPart{ + ETag: "yyy", + PartNumber: 3, + }, + CompletedPart{ + ETag: "zzz", + PartNumber: 5, + }, + } + + tests := []struct { + name string + args args + wantEtag string + wantFound bool + }{ + { + "first", + args{ + "0001.part", + parts, + }, + "lll", + true, + }, + { + "second", + args{ + "0002.part", + parts, + }, + "", + false, + }, + { + "third", + args{ + "0003.part", + parts, + }, + "yyy", + true, + }, + { + "fourth", + args{ + "0004.part", + parts, + }, + "", + false, + }, + { + "fifth", + args{ + "0005.part", + parts, + }, + "zzz", + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotEtag, gotFound := findByPartNumber(tt.args.fileName, tt.args.parts) + assert.Equalf(t, tt.wantEtag, gotEtag, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts) + assert.Equalf(t, tt.wantFound, gotFound, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts) + }) + } +} diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 888003e45..dbd667339 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -41,7 +41,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err != nil { @@ -55,10 +55,11 @@ func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData, func doDeleteEntry(client filer_pb.SeaweedFilerClient, parentDirectoryPath string, entryName string, isDeleteData bool, isRecursive bool) error { request := &filer_pb.DeleteEntryRequest{ - Directory: parentDirectoryPath, - Name: entryName, - IsDeleteData: isDeleteData, - IsRecursive: isRecursive, + Directory: parentDirectoryPath, + Name: entryName, + IsDeleteData: isDeleteData, + IsRecursive: isRecursive, + IgnoreRecursiveError: true, } glog.V(1).Infof("delete entry %v/%v: %v", parentDirectoryPath, entryName, request) diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go index 75d3b37d0..18d4d69c5 100644 --- a/weed/s3api/filer_util_tags.go +++ b/weed/s3api/filer_util_tags.go @@ -1,19 +1,19 @@ package s3api import ( + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "strings" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" ) const ( - S3TAG_PREFIX = xhttp.AmzObjectTagging + "-" + S3TAG_PREFIX = s3_constants.AmzObjectTagging + "-" ) func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) { - err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -35,7 +35,7 @@ func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (t func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -71,7 +71,7 @@ func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, ta func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go deleted file mode 100644 index 6614b0af0..000000000 --- a/weed/s3api/http/header.go +++ /dev/null @@ -1,36 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package http - -// Standard S3 HTTP request constants -const ( - // S3 storage class - AmzStorageClass = "x-amz-storage-class" - - // S3 user-defined metadata - AmzUserMetaPrefix = "X-Amz-Meta-" - - // S3 object tagging - AmzObjectTagging = "X-Amz-Tagging" - AmzTagCount = "x-amz-tagging-count" -) - -// Non-Standard S3 HTTP request constants -const ( - AmzIdentityId = "s3-identity-id" - AmzIsAdmin = "s3-is-admin" // only set to http request header as a context -) diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go new file mode 100644 index 000000000..cd725d435 --- /dev/null +++ b/weed/s3api/s3_constants/header.go @@ -0,0 +1,66 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package s3_constants + +import ( + "github.com/gorilla/mux" + "net/http" + "strings" +) + +// Standard S3 HTTP request constants +const ( + // S3 storage class + AmzStorageClass = "x-amz-storage-class" + + // S3 user-defined metadata + AmzUserMetaPrefix = "X-Amz-Meta-" + AmzUserMetaDirective = "X-Amz-Metadata-Directive" + + // S3 object tagging + AmzObjectTagging = "X-Amz-Tagging" + AmzObjectTaggingPrefix = "X-Amz-Tagging-" + AmzObjectTaggingDirective = "X-Amz-Tagging-Directive" + AmzTagCount = "x-amz-tagging-count" +) + +// Non-Standard S3 HTTP request constants +const ( + AmzIdentityId = "s3-identity-id" + AmzAuthType = "s3-auth-type" + AmzIsAdmin = "s3-is-admin" // only set to http request header as a context +) + +func GetBucketAndObject(r *http.Request) (bucket, object string) { + vars := mux.Vars(r) + bucket = vars["bucket"] + object = vars["object"] + if !strings.HasPrefix(object, "/") { + object = "/" + object + } + + return +} + +var PassThroughHeaders = map[string]string{ + "response-cache-control": "Cache-Control", + "response-content-disposition": "Content-Disposition", + "response-content-encoding": "Content-Encoding", + "response-content-language": "Content-Language", + "response-content-type": "Content-Type", + "response-expires": "Expires", +} diff --git a/weed/s3api/s3_constants/s3_actions.go b/weed/s3api/s3_constants/s3_actions.go index 4e484ac98..0fbf134e3 100644 --- a/weed/s3api/s3_constants/s3_actions.go +++ b/weed/s3api/s3_constants/s3_actions.go @@ -6,4 +6,6 @@ const ( ACTION_ADMIN = "Admin" ACTION_TAGGING = "Tagging" ACTION_LIST = "List" + + SeaweedStorageDestinationHeader = "x-seaweedfs-destination" ) diff --git a/weed/s3api/s3_constants/s3_config.go b/weed/s3api/s3_constants/s3_config.go new file mode 100644 index 000000000..0fa5b26f4 --- /dev/null +++ b/weed/s3api/s3_constants/s3_config.go @@ -0,0 +1,18 @@ +package s3_constants + +import ( + "strings" +) + +var ( + CircuitBreakerConfigDir = "/etc/s3" + CircuitBreakerConfigFile = "circuit_breaker.json" + AllowedActions = []string{ACTION_READ, ACTION_WRITE, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN} + LimitTypeCount = "Count" + LimitTypeBytes = "MB" + Separator = ":" +) + +func Concat(elements ...string) string { + return strings.Join(elements, Separator) +} diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 8beb954aa..f70e46b92 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -3,13 +3,16 @@ package s3api import ( "context" "encoding/xml" + "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "math" "net/http" "time" - xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" "github.com/aws/aws-sdk-go/aws" @@ -27,12 +30,14 @@ type ListAllMyBucketsResult struct { func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { + glog.V(3).Infof("ListBucketsHandler") + var identity *Identity var s3Err s3err.ErrorCode if s3a.iam.isEnabled() { identity, s3Err = s3a.iam.authUser(r) if s3Err != s3err.ErrNone { - s3err.WriteErrorResponse(w, s3Err, r) + s3err.WriteErrorResponse(w, r, s3Err) return } } @@ -42,16 +47,16 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - identityId := r.Header.Get(xhttp.AmzIdentityId) + identityId := r.Header.Get(s3_constants.AmzIdentityId) var buckets []*s3.Bucket for _, entry := range entries { if entry.IsDirectory { - if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name) { + if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name, "") { continue } buckets = append(buckets, &s3.Bucket{ @@ -69,16 +74,17 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques Buckets: buckets, } - writeSuccessResponseXML(w, response) + writeSuccessResponseXML(w, r, response) } func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) { - bucket, _ := getBucketAndObject(r) + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("PutBucketHandler %s", bucket) // avoid duplicated buckets errCode := s3err.ErrNone - if err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{ IncludeEcVolumes: true, IncludeNormalVolumes: true, @@ -95,46 +101,63 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) } return nil }); err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist { errCode = s3err.ErrBucketAlreadyExists } if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } + if s3a.iam.isEnabled() { + if _, errCode = s3a.iam.authRequest(r, s3_constants.ACTION_ADMIN); errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } + } + fn := func(entry *filer_pb.Entry) { - if identityId := r.Header.Get(xhttp.AmzIdentityId); identityId != "" { + if identityId := r.Header.Get(s3_constants.AmzIdentityId); identityId != "" { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } - entry.Extended[xhttp.AmzIdentityId] = []byte(identityId) + entry.Extended[s3_constants.AmzIdentityId] = []byte(identityId) } } // create the folder for bucket, but lazily create actual collection if err := s3a.mkdir(s3a.option.BucketsPath, bucket, fn); err != nil { glog.Errorf("PutBucketHandler mkdir: %v", err) - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - - writeSuccessResponseEmpty(w) + w.Header().Set("Location", "/"+bucket) + writeSuccessResponseEmpty(w, r) } func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) { - bucket, _ := getBucketAndObject(r) + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("DeleteBucketHandler %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { - s3err.WriteErrorResponse(w, err, r) + s3err.WriteErrorResponse(w, r, err) return } - err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + if !s3a.option.AllowDeleteBucketNotEmpty { + entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 1) + if err != nil { + return fmt.Errorf("failed to list bucket %s: %v", bucket, err) + } + if len(entries) > 0 { + return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code) + } + } // delete collection deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ @@ -149,26 +172,36 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque return nil }) + if err != nil { + s3ErrorCode := s3err.ErrInternalError + if err.Error() == s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code { + s3ErrorCode = s3err.ErrBucketNotEmpty + } + s3err.WriteErrorResponse(w, r, s3ErrorCode) + return + } + err = s3a.rm(s3a.option.BucketsPath, bucket, false, true) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - s3err.WriteEmptyResponse(w, http.StatusNoContent) + s3err.WriteEmptyResponse(w, r, http.StatusNoContent) } func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { - bucket, _ := getBucketAndObject(r) + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("HeadBucketHandler %s", bucket) - if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { - s3err.WriteErrorResponse(w, err, r) + if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); entry == nil || err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } - writeSuccessResponseEmpty(w) + writeSuccessResponseEmpty(w, r) } func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode { @@ -184,7 +217,7 @@ func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorC } func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool { - isAdmin := r.Header.Get(xhttp.AmzIsAdmin) != "" + isAdmin := r.Header.Get(s3_constants.AmzIsAdmin) != "" if isAdmin { return true } @@ -192,11 +225,119 @@ func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool { return true } - identityId := r.Header.Get(xhttp.AmzIdentityId) - if id, ok := entry.Extended[xhttp.AmzIdentityId]; ok { + identityId := r.Header.Get(s3_constants.AmzIdentityId) + if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok { if identityId != string(id) { return false } } return true } + +// GetBucketAclHandler Get Bucket ACL +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketAcl.html +func (s3a *S3ApiServer) GetBucketAclHandler(w http.ResponseWriter, r *http.Request) { + // collect parameters + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("GetBucketAclHandler %s", bucket) + + if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, err) + return + } + + response := AccessControlPolicy{} + for _, ident := range s3a.iam.identities { + if len(ident.Credentials) == 0 { + continue + } + for _, action := range ident.Actions { + if !action.overBucket(bucket) || action.getPermission() == "" { + continue + } + id := ident.Credentials[0].AccessKey + if response.Owner.DisplayName == "" && action.isOwner(bucket) && len(ident.Credentials) > 0 { + response.Owner.DisplayName = ident.Name + response.Owner.ID = id + } + response.AccessControlList.Grant = append(response.AccessControlList.Grant, Grant{ + Grantee: Grantee{ + ID: id, + DisplayName: ident.Name, + Type: "CanonicalUser", + XMLXSI: "CanonicalUser", + XMLNS: "http://www.w3.org/2001/XMLSchema-instance"}, + Permission: action.getPermission(), + }) + } + } + writeSuccessResponseXML(w, r, response) +} + +// GetBucketLifecycleConfigurationHandler Get Bucket Lifecycle configuration +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html +func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) { + // collect parameters + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("GetBucketLifecycleConfigurationHandler %s", bucket) + + if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, err) + return + } + fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil) + if err != nil { + glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + ttls := fc.GetCollectionTtls(bucket) + if len(ttls) == 0 { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration) + return + } + response := Lifecycle{} + for prefix, internalTtl := range ttls { + ttl, _ := needle.ReadTTL(internalTtl) + days := int(ttl.Minutes() / 60 / 24) + if days == 0 { + continue + } + response.Rules = append(response.Rules, Rule{ + Status: Enabled, Filter: Filter{ + Prefix: Prefix{string: prefix, set: true}, + set: true, + }, + Expiration: Expiration{Days: days, set: true}, + }) + } + writeSuccessResponseXML(w, r, response) +} + +// PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html +func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) { + + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) + +} + +// DeleteBucketMetricsConfiguration Delete Bucket Lifecycle +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html +func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) { + + s3err.WriteEmptyResponse(w, r, http.StatusNoContent) + +} + +// GetBucketLocationHandler Get bucket location +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html +func (s3a *S3ApiServer) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) { + writeSuccessResponseXML(w, r, LocationConstraint{}) +} + +// GetBucketRequestPaymentHandler Get bucket location +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketRequestPayment.html +func (s3a *S3ApiServer) GetBucketRequestPaymentHandler(w http.ResponseWriter, r *http.Request) { + writeSuccessResponseXML(w, r, RequestPaymentConfiguration{Payer: "BucketOwner"}) +} diff --git a/weed/s3api/s3api_bucket_skip_handlers.go b/weed/s3api/s3api_bucket_skip_handlers.go new file mode 100644 index 000000000..f4ca1177d --- /dev/null +++ b/weed/s3api/s3api_bucket_skip_handlers.go @@ -0,0 +1,49 @@ +package s3api + +import ( + "net/http" + + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" +) + +// GetBucketCorsHandler Get bucket CORS +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketCors.html +func (s3a *S3ApiServer) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchCORSConfiguration) +} + +// PutBucketCorsHandler Put bucket CORS +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketCors.html +func (s3a *S3ApiServer) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) { + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) +} + +// DeleteBucketCorsHandler Delete bucket CORS +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketCors.html +func (s3a *S3ApiServer) DeleteBucketCorsHandler(w http.ResponseWriter, r *http.Request) { + s3err.WriteErrorResponse(w, r, http.StatusNoContent) +} + +// GetBucketPolicyHandler Get bucket Policy +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketPolicy.html +func (s3a *S3ApiServer) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucketPolicy) +} + +// PutBucketPolicyHandler Put bucket Policy +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketPolicy.html +func (s3a *S3ApiServer) PutBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) +} + +// DeleteBucketPolicyHandler Delete bucket Policy +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketPolicy.html +func (s3a *S3ApiServer) DeleteBucketPolicyHandler(w http.ResponseWriter, r *http.Request) { + s3err.WriteErrorResponse(w, r, http.StatusNoContent) +} + +// PutBucketAclHandler Put bucket ACL +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketAcl.html +func (s3a *S3ApiServer) PutBucketAclHandler(w http.ResponseWriter, r *http.Request) { + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) +} diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go new file mode 100644 index 000000000..111b404c7 --- /dev/null +++ b/weed/s3api/s3api_circuit_breaker.go @@ -0,0 +1,183 @@ +package s3api + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "github.com/gorilla/mux" + "net/http" + "sync" + "sync/atomic" +) + +type CircuitBreaker struct { + sync.RWMutex + Enabled bool + counters map[string]*int64 + limitations map[string]int64 +} + +func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { + cb := &CircuitBreaker{ + counters: make(map[string]*int64), + limitations: make(map[string]int64), + } + + err := pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) + if err != nil { + return fmt.Errorf("read S3 circuit breaker config: %v", err) + } + return cb.LoadS3ApiConfigurationFromBytes(content) + }) + + if err != nil { + glog.Warningf("fail to load config: %v", err) + } + + return cb +} + +func (cb *CircuitBreaker) LoadS3ApiConfigurationFromBytes(content []byte) error { + cbCfg := &s3_pb.S3CircuitBreakerConfig{} + if err := filer.ParseS3ConfigurationFromBytes(content, cbCfg); err != nil { + glog.Warningf("unmarshal error: %v", err) + return fmt.Errorf("unmarshal error: %v", err) + } + if err := cb.loadCircuitBreakerConfig(cbCfg); err != nil { + return err + } + return nil +} + +func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerConfig) error { + + //global + globalEnabled := false + globalOptions := cfg.Global + limitations := make(map[string]int64) + if globalOptions != nil && globalOptions.Enabled && len(globalOptions.Actions) > 0 { + globalEnabled = globalOptions.Enabled + for action, limit := range globalOptions.Actions { + limitations[action] = limit + } + } + cb.Enabled = globalEnabled + + //buckets + for bucket, cbOptions := range cfg.Buckets { + if cbOptions.Enabled { + for action, limit := range cbOptions.Actions { + limitations[s3_constants.Concat(bucket, action)] = limit + } + } + } + + cb.limitations = limitations + return nil +} + +func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { + return func(w http.ResponseWriter, r *http.Request) { + if !cb.Enabled { + f(w, r) + return + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + + rollback, errCode := cb.limit(r, bucket, action) + defer func() { + for _, rf := range rollback { + rf() + } + }() + + if errCode == s3err.ErrNone { + f(w, r) + return + } + s3err.WriteErrorResponse(w, r, errCode) + }, Action(action) +} + +func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) { + + //bucket simultaneous request count + bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) + if bucketCountRollBack != nil { + rollback = append(rollback, bucketCountRollBack) + } + if errCode != s3err.ErrNone { + return + } + + //bucket simultaneous request content bytes + bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) + if bucketContentLengthRollBack != nil { + rollback = append(rollback, bucketContentLengthRollBack) + } + if errCode != s3err.ErrNone { + return + } + + //global simultaneous request count + globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) + if globalCountRollBack != nil { + rollback = append(rollback, globalCountRollBack) + } + if errCode != s3err.ErrNone { + return + } + + //global simultaneous request content bytes + globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) + if globalContentLengthRollBack != nil { + rollback = append(rollback, globalContentLengthRollBack) + } + if errCode != s3err.ErrNone { + return + } + return +} + +func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) { + e = s3err.ErrNone + if max, ok := cb.limitations[key]; ok { + cb.RLock() + counter, exists := cb.counters[key] + cb.RUnlock() + + if !exists { + cb.Lock() + counter, exists = cb.counters[key] + if !exists { + var newCounter int64 + counter = &newCounter + cb.counters[key] = counter + } + cb.Unlock() + } + current := atomic.LoadInt64(counter) + if current+inc > max { + e = errCode + return + } else { + current := atomic.AddInt64(counter, inc) + f = func() { + atomic.AddInt64(counter, -inc) + } + if current > max { + e = errCode + return + } + } + } + return +} diff --git a/weed/s3api/s3api_circuit_breaker_test.go b/weed/s3api/s3api_circuit_breaker_test.go new file mode 100644 index 000000000..5848cf164 --- /dev/null +++ b/weed/s3api/s3api_circuit_breaker_test.go @@ -0,0 +1,107 @@ +package s3api + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "net/http" + "sync" + "sync/atomic" + "testing" +) + +type TestLimitCase struct { + actionName string + + limitType string + bucketLimitValue int64 + globalLimitValue int64 + + routineCount int + successCount int64 +} + +var ( + bucket = "/test" + action = s3_constants.ACTION_WRITE + fileSize int64 = 200 + + TestLimitCases = []*TestLimitCase{ + + //bucket-LimitTypeCount + {action, s3_constants.LimitTypeCount, 5, 6, 60, 5}, + {action, s3_constants.LimitTypeCount, 0, 6, 6, 0}, + + //global-LimitTypeCount + {action, s3_constants.LimitTypeCount, 6, 5, 6, 5}, + {action, s3_constants.LimitTypeCount, 6, 0, 6, 0}, + + //bucket-LimitTypeBytes + {action, s3_constants.LimitTypeBytes, 1000, 1020, 6, 5}, + {action, s3_constants.LimitTypeBytes, 0, 1020, 6, 0}, + + //global-LimitTypeBytes + {action, s3_constants.LimitTypeBytes, 1020, 1000, 6, 5}, + {action, s3_constants.LimitTypeBytes, 1020, 0, 6, 0}, + } +) + +func TestLimit(t *testing.T) { + for _, tc := range TestLimitCases { + circuitBreakerConfig := &s3_pb.S3CircuitBreakerConfig{ + Global: &s3_pb.S3CircuitBreakerOptions{ + Enabled: true, + Actions: map[string]int64{ + s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue, + s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue, + }, + }, + Buckets: map[string]*s3_pb.S3CircuitBreakerOptions{ + bucket: { + Enabled: true, + Actions: map[string]int64{ + s3_constants.Concat(tc.actionName, tc.limitType): tc.bucketLimitValue, + }, + }, + }, + } + circuitBreaker := &CircuitBreaker{ + counters: make(map[string]*int64), + limitations: make(map[string]int64), + } + err := circuitBreaker.loadCircuitBreakerConfig(circuitBreakerConfig) + if err != nil { + t.Fatal(err) + } + + successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: fileSize}, tc.actionName) + if successCount != tc.successCount { + t.Errorf("successCount not equal, expect=%d, actual=%d, case: %v", tc.successCount, successCount, tc) + } + } +} + +func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request, action string) int64 { + var successCounter int64 + resultCh := make(chan []func(), routineCount) + var wg sync.WaitGroup + for i := 0; i < routineCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + rollbackFn, errCode := circuitBreaker.limit(r, bucket, action) + if errCode == s3err.ErrNone { + atomic.AddInt64(&successCounter, 1) + } + resultCh <- rollbackFn + }() + } + wg.Wait() + close(resultCh) + for fns := range resultCh { + for _, fn := range fns { + fn() + } + } + return successCounter +} diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index b4d2c22e7..4ace4bb21 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -13,24 +13,26 @@ import ( var _ = filer_pb.FilerClient(&S3ApiServer{}) -func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) + }, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption) } + func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { return location.Url } -func writeSuccessResponseXML(w http.ResponseWriter, response interface{}) { - s3err.WriteXMLResponse(w, http.StatusOK, response) +func writeSuccessResponseXML(w http.ResponseWriter, r *http.Request, response interface{}) { + s3err.WriteXMLResponse(w, r, http.StatusOK, response) + s3err.PostLog(r, http.StatusOK, s3err.ErrNone) } -func writeSuccessResponseEmpty(w http.ResponseWriter) { - s3err.WriteEmptyResponse(w, http.StatusOK) +func writeSuccessResponseEmpty(w http.ResponseWriter, r *http.Request) { + s3err.WriteEmptyResponse(w, r, http.StatusOK) } func validateContentMd5(h http.Header) ([]byte, error) { diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go index 60dd54152..9157748f6 100644 --- a/weed/s3api/s3api_object_copy_handlers.go +++ b/weed/s3api/s3api_object_copy_handlers.go @@ -3,8 +3,9 @@ package s3api import ( "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" - weed_server "github.com/chrislusf/seaweedfs/weed/server" + "modernc.org/strutil" "net/http" "net/url" "strconv" @@ -14,9 +15,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +const ( + DirectiveCopy = "COPY" + DirectiveReplace = "REPLACE" +) + func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { - dstBucket, dstObject := getBucketAndObject(r) + dstBucket, dstObject := s3_constants.GetBucketAndObject(r) // Copy source path. cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) @@ -27,19 +33,25 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request srcBucket, srcObject := pathToBucketAndObject(cpSrcPath) - if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && isReplace(r) { + glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject) + + replaceMeta, replaceTagging := replaceDirective(r.Header) + + if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) { fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) dir, name := fullPath.DirAndName() entry, err := s3a.getEntry(dir, name) - if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r) + if err != nil || entry.IsDirectory { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return } - entry.Extended = weed_server.SaveAmzMetaData(r, entry.Extended, isReplace(r)) + entry.Extended = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging) err = s3a.touch(dir, name, entry) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return } - writeSuccessResponseXML(w, CopyObjectResult{ + writeSuccessResponseXML(w, r, CopyObjectResult{ ETag: fmt.Sprintf("%x", entry.Attributes.Md5), LastModified: time.Now().UTC(), }) @@ -48,32 +60,44 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request // If source object is empty or bucket is empty, reply back invalid copy source. if srcObject == "" || srcBucket == "" { - s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } + srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) + dir, name := srcPath.DirAndName() + if entry, err := s3a.getEntry(dir, name); err != nil || entry.IsDirectory { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } if srcBucket == dstBucket && srcObject == dstObject { - s3err.WriteErrorResponse(w, s3err.ErrInvalidCopyDest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest) return } dstUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s", - s3a.option.Filer, s3a.option.BucketsPath, dstBucket, dstObject, dstBucket) + s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject), dstBucket) srcUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, srcBucket, srcObject) + s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject)) - _, _, resp, err := util.DownloadFile(srcUrl, "") + _, _, resp, err := util.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } defer util.CloseResponse(resp) + tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name) + if tagErr != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) - etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body) + destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject) + etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } @@ -84,7 +108,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request LastModified: time.Now().UTC(), } - writeSuccessResponseXML(w, response) + writeSuccessResponseXML(w, r, response) } @@ -105,7 +129,7 @@ type CopyPartResult struct { func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) { // https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html // https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html - dstBucket, _ := getBucketAndObject(r) + dstBucket, dstObject := s3_constants.GetBucketAndObject(r) // Copy source path. cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) @@ -117,7 +141,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req srcBucket, srcObject := pathToBucketAndObject(cpSrcPath) // If source object is empty or bucket is empty, reply back invalid copy source. if srcObject == "" || srcBucket == "" { - s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } @@ -126,35 +150,38 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req partID, err := strconv.Atoi(partIDString) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidPart, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) return } + glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d", srcBucket, srcObject, dstBucket, partID) + // check partID with maximum part ID for multipart objects if partID > globalMaxPartID { - s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts) return } rangeHeader := r.Header.Get("x-amz-copy-source-range") dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s", - s3a.option.Filer, s3a.genUploadsFolder(dstBucket), uploadID, partID, dstBucket) + s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID, dstBucket) srcUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, srcBucket, srcObject) + s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject)) - dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, rangeHeader) + dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } defer dataReader.Close() glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) - etag, errCode := s3a.putToFiler(r, dstUrl, dataReader) + destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject) + etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } @@ -165,10 +192,111 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req LastModified: time.Now().UTC(), } - writeSuccessResponseXML(w, response) + writeSuccessResponseXML(w, r, response) } -func isReplace(r *http.Request) bool { - return r.Header.Get("X-Amz-Metadata-Directive") == "REPLACE" +func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) { + return reqHeader.Get(s3_constants.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(s3_constants.AmzObjectTaggingDirective) == DirectiveReplace +} + +func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) { + if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) == 0 { + if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 { + reqHeader[s3_constants.AmzStorageClass] = sc + } + } + + if !replaceMeta { + for header, _ := range reqHeader { + if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) { + delete(reqHeader, header) + } + } + for k, v := range existing { + if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { + reqHeader[k] = v + } + } + } + + if !replaceTagging { + for header, _ := range reqHeader { + if strings.HasPrefix(header, s3_constants.AmzObjectTagging) { + delete(reqHeader, header) + } + } + + found := false + for k, _ := range existing { + if strings.HasPrefix(k, s3_constants.AmzObjectTaggingPrefix) { + found = true + break + } + } + + if found { + tags, err := getTags(dir, name) + if err != nil { + return err + } + + var tagArr []string + for k, v := range tags { + tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v)) + } + tagStr := strutil.JoinFields(tagArr, "&") + reqHeader.Set(s3_constants.AmzObjectTagging, tagStr) + } + } + return +} + +func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte) { + metadata = make(map[string][]byte) + + if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 { + metadata[s3_constants.AmzStorageClass] = sc + } + if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) > 0 { + metadata[s3_constants.AmzStorageClass] = []byte(sc) + } + + if replaceMeta { + for header, values := range reqHeader { + if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) { + for _, value := range values { + metadata[header] = []byte(value) + } + } + } + } else { + for k, v := range existing { + if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { + metadata[k] = v + } + } + } + + if replaceTagging { + if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" { + for _, v := range strings.Split(tags, "&") { + tag := strings.Split(v, "=") + if len(tag) == 2 { + metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) + } else if len(tag) == 1 { + metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = nil + } + } + } + } else { + for k, v := range existing { + if strings.HasPrefix(k, s3_constants.AmzObjectTagging) { + metadata[k] = v + } + } + delete(metadata, s3_constants.AmzTagCount) + } + + return } diff --git a/weed/s3api/s3api_object_copy_handlers_test.go b/weed/s3api/s3api_object_copy_handlers_test.go new file mode 100644 index 000000000..610b29a6b --- /dev/null +++ b/weed/s3api/s3api_object_copy_handlers_test.go @@ -0,0 +1,426 @@ +package s3api + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "net/http" + "reflect" + "sort" + "strings" + "testing" +) + +type H map[string]string + +func (h H) String() string { + pairs := make([]string, 0, len(h)) + for k, v := range h { + pairs = append(pairs, fmt.Sprintf("%s : %s", k, v)) + } + sort.Strings(pairs) + join := strings.Join(pairs, "\n") + return "\n" + join + "\n" +} + +var processMetadataTestCases = []struct { + caseId int + request H + existing H + getTags H + want H +}{ + { + 201, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging": "A=B&a=b&type=existing", + }, + }, + { + 202, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=existing", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + }, + }, + + { + 203, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 204, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 205, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{}, + H{}, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 206, + H{ + "User-Agent": "firefox", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 207, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, +} +var processMetadataBytesTestCases = []struct { + caseId int + request H + existing H + want H +}{ + { + 101, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + }, + + { + 102, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + }, + + { + 103, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "request", + }, + }, + + { + 104, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "request", + }, + }, + + { + 105, + H{ + "User-Agent": "firefox", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{}, + }, + + { + 107, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + s3_constants.AmzUserMetaDirective: DirectiveReplace, + s3_constants.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{}, + H{ + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "request", + }, + }, +} + +func TestProcessMetadata(t *testing.T) { + for _, tc := range processMetadataTestCases { + reqHeader := transferHToHeader(tc.request) + existing := transferHToHeader(tc.existing) + replaceMeta, replaceTagging := replaceDirective(reqHeader) + + err := processMetadata(reqHeader, existing, replaceMeta, replaceTagging, func(_ string, _ string) (tags map[string]string, err error) { + return tc.getTags, nil + }, "", "") + if err != nil { + t.Error(err) + } + + result := transferHeaderToH(reqHeader) + fmtTagging(result, tc.want) + + if !reflect.DeepEqual(result, tc.want) { + t.Error(fmt.Errorf("\n### CaseID: %d ###"+ + "\nRequest:%v"+ + "\nExisting:%v"+ + "\nGetTags:%v"+ + "\nWant:%v"+ + "\nActual:%v", + tc.caseId, tc.request, tc.existing, tc.getTags, tc.want, result)) + } + } +} + +func TestProcessMetadataBytes(t *testing.T) { + for _, tc := range processMetadataBytesTestCases { + reqHeader := transferHToHeader(tc.request) + existing := transferHToBytesArr(tc.existing) + replaceMeta, replaceTagging := replaceDirective(reqHeader) + extends := processMetadataBytes(reqHeader, existing, replaceMeta, replaceTagging) + + result := transferBytesArrToH(extends) + fmtTagging(result, tc.want) + + if !reflect.DeepEqual(result, tc.want) { + t.Error(fmt.Errorf("\n### CaseID: %d ###"+ + "\nRequest:%v"+ + "\nExisting:%v"+ + "\nWant:%v"+ + "\nActual:%v", + tc.caseId, tc.request, tc.existing, tc.want, result)) + } + } +} + +func fmtTagging(maps ...map[string]string) { + for _, m := range maps { + if tagging := m[s3_constants.AmzObjectTagging]; len(tagging) > 0 { + split := strings.Split(tagging, "&") + sort.Strings(split) + m[s3_constants.AmzObjectTagging] = strings.Join(split, "&") + } + } +} + +func transferHToHeader(data map[string]string) http.Header { + header := http.Header{} + for k, v := range data { + header.Add(k, v) + } + return header +} + +func transferHToBytesArr(data map[string]string) map[string][]byte { + m := make(map[string][]byte, len(data)) + for k, v := range data { + m[k] = []byte(v) + } + return m +} + +func transferBytesArrToH(data map[string][]byte) H { + m := make(map[string]string, len(data)) + for k, v := range data { + m[k] = string(v) + } + return m +} + +func transferHeaderToH(data map[string][]string) H { + m := make(map[string]string, len(data)) + for k, v := range data { + m[k] = v[len(v)-1] + } + return m +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 845c9a577..4ad3454ba 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -1,23 +1,25 @@ package s3api import ( + "bytes" "crypto/md5" "encoding/json" "encoding/xml" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/pquerna/cachecontrol/cacheobject" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "golang.org/x/exp/slices" "io" - "io/ioutil" "net/http" "net/url" - "sort" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/pquerna/cachecontrol/cacheobject" - "github.com/gorilla/mux" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -25,39 +27,43 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -var ( - client *http.Client +const ( + deleteMultipleObjectsLimmit = 1000 ) -func init() { - client = &http.Client{Transport: &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} +func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { + mimeBuffer := make([]byte, 512) + size, _ := dataReader.Read(mimeBuffer) + if size > 0 { + r.Header.Set("Content-Type", http.DetectContentType(mimeBuffer[:size])) + return io.NopCloser(io.MultiReader(bytes.NewReader(mimeBuffer[:size]), dataReader)) + } + return io.NopCloser(dataReader) } func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("PutObjectHandler %s %s", bucket, object) _, err := validateContentMd5(r.Header) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidDigest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest) return } if r.Header.Get("Cache-Control") != "" { if _, err = cacheobject.ParseRequestCacheControl(r.Header.Get("Cache-Control")); err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidDigest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest) return } } if r.Header.Get("Expires") != "" { if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidDigest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedExpires) return } } @@ -75,36 +81,45 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) } if s3ErrCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, s3ErrCode, r) + s3err.WriteErrorResponse(w, r, s3ErrCode) return } } else { if authTypeStreamingSigned == rAuthType { - s3err.WriteErrorResponse(w, s3err.ErrAuthNotSetup, r) + s3err.WriteErrorResponse(w, r, s3err.ErrAuthNotSetup) return } } defer dataReader.Close() + objectContentType := r.Header.Get("Content-Type") if strings.HasSuffix(object, "/") { - if err := s3a.mkdir(s3a.option.BucketsPath, bucket+object, nil); err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + if err := s3a.mkdir(s3a.option.BucketsPath, bucket+strings.TrimSuffix(object, "/"), func(entry *filer_pb.Entry) { + if objectContentType == "" { + objectContentType = "httpd/unix-directory" + } + entry.Attributes.Mime = objectContentType + }); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } } else { - uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) + uploadUrl := s3a.toFilerUrl(bucket, object) + if objectContentType == "" { + dataReader = mimeDetect(r, dataReader) + } - etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader) + etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "") if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } setEtag(w, etag) } - writeSuccessResponseEmpty(w) + writeSuccessResponseEmpty(w, r) } func urlPathEscape(object string) string { @@ -115,45 +130,51 @@ func urlPathEscape(object string) string { return strings.Join(escapedParts, "/") } +func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string { + destUrl := fmt.Sprintf("http://%s%s/%s%s", + s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object)) + return destUrl +} + func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("GetObjectHandler %s %s", bucket, object) if strings.HasSuffix(r.URL.Path, "/") { - s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) return } - destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) - - s3a.proxyToFiler(w, r, destUrl, passThroughResponse) + destUrl := s3a.toFilerUrl(bucket, object) + s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) } func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) - - destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object) - s3a.proxyToFiler(w, r, destUrl, passThroughResponse) + destUrl := s3a.toFilerUrl(bucket, object) + s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) } func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object) - destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true", - s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) + destUrl := s3a.toFilerUrl(bucket, object) - s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) { + s3a.proxyToFiler(w, r, destUrl, true, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int) { + statusCode = http.StatusNoContent for k, v := range proxyResponse.Header { w.Header()[k] = v } - w.WriteHeader(http.StatusNoContent) + w.WriteHeader(statusCode) + return statusCode }) } @@ -191,30 +212,39 @@ type DeleteObjectsResponse struct { // DeleteMultipleObjectsHandler - Delete multiple objects func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) { - bucket, _ := getBucketAndObject(r) + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("DeleteMultipleObjectsHandler %s", bucket) - deleteXMLBytes, err := ioutil.ReadAll(r.Body) + deleteXMLBytes, err := io.ReadAll(r.Body) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } deleteObjects := &DeleteObjectsRequest{} if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil { - s3err.WriteErrorResponse(w, s3err.ErrMalformedXML, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML) + return + } + + if len(deleteObjects.Objects) > deleteMultipleObjectsLimmit { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxDeleteObjects) return } var deletedObjects []ObjectIdentifier var deleteErrors []DeleteError + var auditLog *s3err.AccessLog directoriesWithDeletion := make(map[string]int) - s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if s3err.Logger != nil { + auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) + } + s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // delete file entries for _, object := range deleteObjects.Objects { - lastSeparator := strings.LastIndex(object.ObjectName, "/") parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) { @@ -237,6 +267,10 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h Key: object.ObjectName, }) } + if auditLog != nil { + auditLog.Key = entryName + s3err.PostAccessLog(*auditLog) + } } // purge empty folders, only checking folders with deletions @@ -253,7 +287,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } deleteResp.Errors = deleteErrors - writeSuccessResponseXML(w, deleteResp) + writeSuccessResponseXML(w, r, deleteResp) } @@ -262,8 +296,8 @@ func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerCli for dir, _ := range directoriesWithDeletion { allDirs = append(allDirs, dir) } - sort.Slice(allDirs, func(i, j int) bool { - return len(allDirs[i]) > len(allDirs[j]) + slices.SortFunc(allDirs, func(a, b string) bool { + return len(a) > len(b) }) newDirectoriesWithDeletion = make(map[string]int) for _, dir := range allDirs { @@ -280,87 +314,81 @@ func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerCli return } -var passThroughHeaders = []string{ - "response-cache-control", - "response-content-disposition", - "response-content-encoding", - "response-content-language", - "response-content-type", - "response-expires", -} - -func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter)) { +func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int)) { - glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl) + glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl) proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body) if err != nil { glog.Errorf("NewRequest %s: %v", destUrl, err) - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - proxyReq.Header.Set("Host", s3a.option.Filer) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - - for header, values := range r.Header { - // handle s3 related headers - passed := false - for _, h := range passThroughHeaders { - if strings.ToLower(header) == h && len(values) > 0 { - proxyReq.Header.Add(header[len("response-"):], values[0]) - passed = true - break - } - } - if passed { - continue - } - // handle other headers - for _, value := range values { - proxyReq.Header.Add(header, value) + for k, v := range r.URL.Query() { + if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok { + proxyReq.Header[k] = v } } + for header, values := range r.Header { + proxyReq.Header[header] = values + } - resp, postErr := client.Do(proxyReq) + // ensure that the Authorization header is overriding any previous + // Authorization header which might be already present in proxyReq + s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite) + resp, postErr := s3a.client.Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } defer util.CloseResponse(resp) if resp.StatusCode == http.StatusPreconditionFailed { - s3err.WriteErrorResponse(w, s3err.ErrPreconditionFailed, r) + s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed) + return + } + + if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) return } if (resp.ContentLength == -1 || resp.StatusCode == 404) && resp.StatusCode != 304 { if r.Method != "DELETE" { - s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } } - responseFn(resp, w) - + responseStatusCode := responseFn(resp, w) + s3err.PostLog(r, responseStatusCode, s3err.ErrNone) } -func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) { +func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int) { for k, v := range proxyResponse.Header { w.Header()[k] = v } if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 { w.WriteHeader(http.StatusPartialContent) + statusCode = http.StatusPartialContent } else { - w.WriteHeader(proxyResponse.StatusCode) + statusCode = proxyResponse.StatusCode + } + w.WriteHeader(statusCode) + buf := mem.Allocate(128 * 1024) + defer mem.Free(buf) + if n, err := io.CopyBuffer(w, proxyResponse.Body, buf); err != nil { + glog.V(1).Infof("passthrough response read %d bytes: %v", n, err) } - io.Copy(w, proxyResponse.Body) + return statusCode } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code s3err.ErrorCode) { +func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string) (etag string, code s3err.ErrorCode) { hash := md5.New() var body = io.TeeReader(dataReader, hash) @@ -372,16 +400,20 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader return "", s3err.ErrInternalError } - proxyReq.Header.Set("Host", s3a.option.Filer) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + if destination != "" { + proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) + } for header, values := range r.Header { for _, value := range values { proxyReq.Header.Add(header, value) } } - - resp, postErr := client.Do(proxyReq) + // ensure that the Authorization header is overriding any previous + // Authorization header which might be already present in proxyReq + s3a.maybeAddFilerJwtAuthorization(proxyReq, true) + resp, postErr := s3a.client.Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) @@ -391,7 +423,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader etag = fmt.Sprintf("%x", hash.Sum(nil)) - resp_body, ra_err := ioutil.ReadAll(resp.Body) + resp_body, ra_err := io.ReadAll(resp.Body) if ra_err != nil { glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err) return etag, s3err.ErrInternalError @@ -420,20 +452,33 @@ func setEtag(w http.ResponseWriter, etag string) { } } -func getBucketAndObject(r *http.Request) (bucket, object string) { - vars := mux.Vars(r) - bucket = vars["bucket"] - object = vars["object"] - if !strings.HasPrefix(object, "/") { - object = "/" + object +func filerErrorToS3Error(errString string) s3err.ErrorCode { + switch { + case strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory"): + return s3err.ErrExistingObjectIsDirectory + case strings.HasSuffix(errString, "is a file"): + return s3err.ErrExistingObjectIsFile + default: + return s3err.ErrInternalError } +} - return +func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) { + encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite) + + if encodedJwt == "" { + return + } + + r.Header.Set("Authorization", "BEARER "+string(encodedJwt)) } -func filerErrorToS3Error(errString string) s3err.ErrorCode { - if strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory") { - return s3err.ErrExistingObjectIsDirectory +func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string { + var encodedJwt security.EncodedJwt + if isWrite { + encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec) + } else { + encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec) } - return s3err.ErrInternalError + return string(encodedJwt) } diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index e1125689f..5704fcf38 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -5,16 +5,17 @@ import ( "encoding/base64" "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/s3api/policy" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" - "github.com/dustin/go-humanize" - "github.com/gorilla/mux" "io" - "io/ioutil" "mime/multipart" "net/http" "net/url" "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/s3api/policy" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "github.com/dustin/go-humanize" + "github.com/gorilla/mux" ) func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.Request) { @@ -24,25 +25,27 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R bucket := mux.Vars(r)["bucket"] + glog.V(3).Infof("PostPolicyBucketHandler %s", bucket) + reader, err := r.MultipartReader() if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest) return } form, err := reader.ReadForm(int64(5 * humanize.MiByte)) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest) return } defer form.RemoveAll() fileBody, fileName, fileSize, formValues, err := extractPostPolicyFormValues(form) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest) return } if fileBody == nil { - s3err.WriteErrorResponse(w, s3err.ErrPOSTFileRequired, r) + s3err.WriteErrorResponse(w, r, s3err.ErrPOSTFileRequired) return } defer fileBody.Close() @@ -60,7 +63,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R if successRedirect != "" { redirectURL, err = url.Parse(successRedirect) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest) return } } @@ -68,13 +71,13 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R // Verify policy signature. errCode := s3a.iam.doesPolicySignatureMatch(formValues) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } policyBytes, err := base64.StdEncoding.DecodeString(formValues.Get("Policy")) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest) return } @@ -83,7 +86,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R postPolicyForm, err := policy.ParsePostPolicyForm(string(policyBytes)) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrPostPolicyConditionInvalidFormat, r) + s3err.WriteErrorResponse(w, r, s3err.ErrPostPolicyConditionInvalidFormat) return } @@ -99,23 +102,23 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R lengthRange := postPolicyForm.Conditions.ContentLengthRange if lengthRange.Valid { if fileSize < lengthRange.Min { - s3err.WriteErrorResponse(w, s3err.ErrEntityTooSmall, r) + s3err.WriteErrorResponse(w, r, s3err.ErrEntityTooSmall) return } if fileSize > lengthRange.Max { - s3err.WriteErrorResponse(w, s3err.ErrEntityTooLarge, r) + s3err.WriteErrorResponse(w, r, s3err.ErrEntityTooLarge) return } } } - uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) + uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object)) - etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody) + etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "") if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } @@ -123,7 +126,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R // Replace raw query params.. redirectURL.RawQuery = getRedirectPostRawQuery(bucket, object, etag) w.Header().Set("Location", redirectURL.String()) - s3err.WriteEmptyResponse(w, http.StatusSeeOther) + s3err.WriteEmptyResponse(w, r, http.StatusSeeOther) return } @@ -138,18 +141,19 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R ETag: `"` + etag + `"`, Location: w.Header().Get("Location"), } - s3err.WriteXMLResponse(w, http.StatusCreated, resp) + s3err.WriteXMLResponse(w, r, http.StatusCreated, resp) + s3err.PostLog(r, http.StatusCreated, s3err.ErrNone) case "200": - s3err.WriteEmptyResponse(w, http.StatusOK) + s3err.WriteEmptyResponse(w, r, http.StatusOK) default: - writeSuccessResponseEmpty(w) + writeSuccessResponseEmpty(w, r) } } // Extract form fields and file data from a HTTP POST Policy func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, fileName string, fileSize int64, formValues http.Header, err error) { - /// HTML Form values + // / HTML Form values fileName = "" // Canonicalize the form values into http.Header. @@ -172,7 +176,7 @@ func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, b.WriteString(v) } fileSize = int64(b.Len()) - filePart = ioutil.NopCloser(b) + filePart = io.NopCloser(b) return filePart, fileName, fileSize, formValues, nil } diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 308d9a5f5..768f4d180 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -1,15 +1,20 @@ package s3api import ( + "crypto/sha1" + "encoding/xml" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" - weed_server "github.com/chrislusf/seaweedfs/weed/server" + "io" "net/http" "net/url" "strconv" "strings" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + weed_server "github.com/chrislusf/seaweedfs/weed/server" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" ) @@ -23,7 +28,7 @@ const ( // NewMultipartUploadHandler - New multipart upload. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) createMultipartUploadInput := &s3.CreateMultipartUploadInput{ Bucket: aws.String(bucket), @@ -36,49 +41,71 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http createMultipartUploadInput.Metadata[k] = aws.String(string(v)) } + contentType := r.Header.Get("Content-Type") + if contentType != "" { + createMultipartUploadInput.ContentType = &contentType + } response, errCode := s3a.createMultipartUpload(createMultipartUploadInput) glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } - writeSuccessResponseXML(w, response) + writeSuccessResponseXML(w, r, response) } // CompleteMultipartUploadHandler - Completes multipart upload. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html + + bucket, object := s3_constants.GetBucketAndObject(r) + + parts := &CompleteMultipartUpload{} + if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML) + return + } // Get upload id. uploadID, _, _, _ := getObjectResources(r.URL.Query()) + err := s3a.checkUploadId(object, uploadID) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) + return + } response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: aws.String(bucket), Key: objectKey(aws.String(object)), UploadId: aws.String(uploadID), - }) + }, parts) glog.V(2).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } - writeSuccessResponseXML(w, response) + writeSuccessResponseXML(w, r, response) } // AbortMultipartUploadHandler - Aborts multipart upload. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) // Get upload id. uploadID, _, _, _ := getObjectResources(r.URL.Query()) + err := s3a.checkUploadId(object, uploadID) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) + return + } response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(bucket), @@ -87,29 +114,31 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht }) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response))) - writeSuccessResponseXML(w, response) + //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html + s3err.WriteXMLResponse(w, r, http.StatusNoContent, response) + s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone) } // ListMultipartUploadsHandler - Lists multipart uploads. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) { - bucket, _ := getBucketAndObject(r) + bucket, _ := s3_constants.GetBucketAndObject(r) prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query()) if maxUploads < 0 { - s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxUploads, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads) return } if keyMarker != "" { // Marker not common with prefix is not implemented. if !strings.HasPrefix(keyMarker, prefix) { - s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) return } } @@ -124,29 +153,35 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht UploadIdMarker: aws.String(uploadIDMarker), }) - glog.V(2).Info("ListMultipartUploadsHandler", string(s3err.EncodeXMLResponse(response)), errCode) + glog.V(2).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } // TODO handle encodingType - writeSuccessResponseXML(w, response) + writeSuccessResponseXML(w, r, response) } // ListObjectPartsHandler - Lists object parts in a multipart upload. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query()) if partNumberMarker < 0 { - s3err.WriteErrorResponse(w, s3err.ErrInvalidPartNumberMarker, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker) return } if maxParts < 0 { - s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts) + return + } + + err := s3a.checkUploadId(object, uploadID) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) return } @@ -158,36 +193,36 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re UploadId: aws.String(uploadID), }) - glog.V(2).Info("ListObjectPartsHandler", string(s3err.EncodeXMLResponse(response)), errCode) - if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } - writeSuccessResponseXML(w, response) + glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part)) + + writeSuccessResponseXML(w, r, response) } // PutObjectPartHandler - Put an object part in a multipart upload. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { - bucket, _ := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) uploadID := r.URL.Query().Get("uploadId") - exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true) - if !exists { - s3err.WriteErrorResponse(w, s3err.ErrNoSuchUpload, r) + err := s3a.checkUploadId(object, uploadID) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) return } partIDString := r.URL.Query().Get("partNumber") partID, err := strconv.Atoi(partIDString) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInvalidPart, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) return } if partID > globalMaxPartID { - s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts) return } @@ -204,25 +239,31 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) } if s3ErrCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, s3ErrCode, r) + s3err.WriteErrorResponse(w, r, s3ErrCode) return } } defer dataReader.Close() + glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID) + uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s", - s3a.option.Filer, s3a.genUploadsFolder(bucket), uploadID, partID, bucket) + s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, bucket) - etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader) + if partID == 1 && r.Header.Get("Content-Type") == "" { + dataReader = mimeDetect(r, dataReader) + } + destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination) if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, errCode, r) + s3err.WriteErrorResponse(w, r, errCode) return } setEtag(w, etag) - writeSuccessResponseEmpty(w) + writeSuccessResponseEmpty(w, r) } @@ -230,6 +271,27 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string { return fmt.Sprintf("%s/%s/.uploads", s3a.option.BucketsPath, bucket) } +// Generate uploadID hash string from object +func (s3a *S3ApiServer) generateUploadID(object string) string { + if strings.HasPrefix(object, "/") { + object = object[1:] + } + h := sha1.New() + h.Write([]byte(object)) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +//Check object name and uploadID when processing multipart uploading +func (s3a *S3ApiServer) checkUploadId(object string, id string) error { + + hash := s3a.generateUploadID(object) + if hash != id { + glog.Errorf("object %s and uploadID %s are not matched", object, id) + return fmt.Errorf("object %s and uploadID %s are not matched", object, id) + } + return nil +} + // Parse bucket url queries for ?uploads func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) { prefix = values.Get("prefix") @@ -258,8 +320,24 @@ func getObjectResources(values url.Values) (uploadID string, partNumberMarker, m return } -type byCompletedPartNumber []*s3.CompletedPart +func xmlDecoder(body io.Reader, v interface{}, size int64) error { + var lbody io.Reader + if size > 0 { + lbody = io.LimitReader(body, size) + } else { + lbody = body + } + d := xml.NewDecoder(lbody) + d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) { + return input, nil + } + return d.Decode(v) +} -func (a byCompletedPartNumber) Len() int { return len(a) } -func (a byCompletedPartNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byCompletedPartNumber) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } +type CompleteMultipartUpload struct { + Parts []CompletedPart `xml:"Part"` +} +type CompletedPart struct { + ETag string + PartNumber int +} diff --git a/weed/s3api/s3api_object_skip_handlers.go b/weed/s3api/s3api_object_skip_handlers.go new file mode 100644 index 000000000..160d02475 --- /dev/null +++ b/weed/s3api/s3api_object_skip_handlers.go @@ -0,0 +1,45 @@ +package s3api + +import ( + "net/http" +) + +// GetObjectAclHandler Put object ACL +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAcl.html +func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Request) { + + w.WriteHeader(http.StatusNoContent) + +} + +// PutObjectAclHandler Put object ACL +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectAcl.html +func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Request) { + + w.WriteHeader(http.StatusNoContent) + +} + +// PutObjectRetentionHandler Put object Retention +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectRetention.html +func (s3a *S3ApiServer) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) { + + w.WriteHeader(http.StatusNoContent) + +} + +// PutObjectLegalHoldHandler Put object Legal Hold +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectLegalHold.html +func (s3a *S3ApiServer) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) { + + w.WriteHeader(http.StatusNoContent) + +} + +// PutObjectLockConfigurationHandler Put object Lock configuration +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectLockConfiguration.html +func (s3a *S3ApiServer) PutObjectLockConfigurationHandler(w http.ResponseWriter, r *http.Request) { + + w.WriteHeader(http.StatusNoContent) + +} diff --git a/weed/s3api/s3api_object_tagging_handlers.go b/weed/s3api/s3api_object_tagging_handlers.go index fd3ec2ff7..9fde0309c 100644 --- a/weed/s3api/s3api_object_tagging_handlers.go +++ b/weed/s3api/s3api_object_tagging_handlers.go @@ -3,20 +3,22 @@ package s3api import ( "encoding/xml" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "io" + "net/http" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" "github.com/chrislusf/seaweedfs/weed/util" - "io" - "io/ioutil" - "net/http" ) // GetObjectTaggingHandler - GET object tagging // API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("GetObjectTaggingHandler %s %s", bucket, object) target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) dir, name := target.DirAndName() @@ -25,15 +27,15 @@ func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.R if err != nil { if err == filer_pb.ErrNotFound { glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) } else { glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } return } - writeSuccessResponseXML(w, FromTags(tags)) + writeSuccessResponseXML(w, r, FromTags(tags)) } @@ -41,38 +43,39 @@ func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.R // API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("PutObjectTaggingHandler %s %s", bucket, object) target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) dir, name := target.DirAndName() tagging := &Tagging{} - input, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength)) + input, err := io.ReadAll(io.LimitReader(r.Body, r.ContentLength)) if err != nil { glog.Errorf("PutObjectTaggingHandler read input %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } if err = xml.Unmarshal(input, tagging); err != nil { glog.Errorf("PutObjectTaggingHandler Unmarshal %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrMalformedXML, r) + s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML) return } tags := tagging.ToTags() if len(tags) > 10 { glog.Errorf("PutObjectTaggingHandler tags %s: %d tags more than 10", r.URL, len(tags)) - s3err.WriteErrorResponse(w, s3err.ErrInvalidTag, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag) return } for k, v := range tags { if len(k) > 128 { glog.Errorf("PutObjectTaggingHandler tags %s: tag key %s longer than 128", r.URL, k) - s3err.WriteErrorResponse(w, s3err.ErrInvalidTag, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag) return } if len(v) > 256 { glog.Errorf("PutObjectTaggingHandler tags %s: tag value %s longer than 256", r.URL, v) - s3err.WriteErrorResponse(w, s3err.ErrInvalidTag, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag) return } } @@ -80,23 +83,24 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil { if err == filer_pb.ErrNotFound { glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) } else { glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } return } - w.WriteHeader(http.StatusNoContent) - + w.WriteHeader(http.StatusOK) + s3err.PostLog(r, http.StatusOK, s3err.ErrNone) } // DeleteObjectTaggingHandler Delete object tagging // API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjectTagging.html func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) { - bucket, object := getBucketAndObject(r) + bucket, object := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("DeleteObjectTaggingHandler %s %s", bucket, object) target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)) dir, name := target.DirAndName() @@ -105,13 +109,14 @@ func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *htt if err != nil { if err == filer_pb.ErrNotFound { glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) } else { glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } return } w.WriteHeader(http.StatusNoContent) + s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone) } diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 51a58af6a..6b934bccd 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -5,6 +5,7 @@ import ( "encoding/xml" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "io" "net/http" "net/url" @@ -15,7 +16,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" ) @@ -39,16 +39,17 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html // collect parameters - bucket, _ := getBucketAndObject(r) + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("ListObjectsV2Handler %s", bucket) originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query()) if maxKeys < 0 { - s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxKeys, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys) return } if delimiter != "" && delimiter != "/" { - s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) return } @@ -60,13 +61,13 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } if len(response.Contents) == 0 { if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists { - s3err.WriteErrorResponse(w, s3err.ErrNoSuchBucket, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } } @@ -86,7 +87,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ StartAfter: startAfter, } - writeSuccessResponseXML(w, responseV2) + writeSuccessResponseXML(w, r, responseV2) } func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) { @@ -94,34 +95,35 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html // collect parameters - bucket, _ := getBucketAndObject(r) + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("ListObjectsV1Handler %s", bucket) originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query()) if maxKeys < 0 { - s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxKeys, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys) return } if delimiter != "" && delimiter != "/" { - s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) return } response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter) if err != nil { - s3err.WriteErrorResponse(w, s3err.ErrInternalError, r) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } if len(response.Contents) == 0 { if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists { - s3err.WriteErrorResponse(w, s3err.ErrNoSuchBucket, r) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } } - writeSuccessResponseXML(w, response) + writeSuccessResponseXML(w, r, response) } func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) { @@ -131,10 +133,10 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m reqDir = reqDir[1:] } bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) + bucketPrefixLen := len(bucketPrefix) reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir) if strings.HasSuffix(reqDir, "/") { - // remove trailing "/" - reqDir = reqDir[:len(reqDir)-1] + reqDir = strings.TrimSuffix(reqDir, "/") } var contents []ListEntry @@ -144,33 +146,36 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m var nextMarker string // check filer - err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) { + _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, false, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { if delimiter == "/" { commonPrefixes = append(commonPrefixes, PrefixEntry{ - Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):], + Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:], }) } - } else { - storageClass := "STANDARD" - if v, ok := entry.Extended[xhttp.AmzStorageClass]; ok { - storageClass = string(v) + if !(entry.IsDirectoryKeyObject() && strings.HasSuffix(entry.Name, "/")) { + return } - contents = append(contents, ListEntry{ - Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):], - LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), - ETag: "\"" + filer.ETag(entry) + "\"", - Size: int64(filer.FileSize(entry)), - Owner: CanonicalUser{ - ID: fmt.Sprintf("%x", entry.Attributes.Uid), - DisplayName: entry.Attributes.UserName, - }, - StorageClass: StorageClass(storageClass), - }) } + storageClass := "STANDARD" + if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { + storageClass = string(v) + } + contents = append(contents, ListEntry{ + Key: fmt.Sprintf("%s/%s", dir, entry.Name)[bucketPrefixLen:], + LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), + ETag: "\"" + filer.ETag(entry) + "\"", + Size: int64(filer.FileSize(entry)), + Owner: CanonicalUser{ + ID: fmt.Sprintf("%x", entry.Attributes.Uid), + DisplayName: entry.Attributes.UserName, + }, + StorageClass: StorageClass(storageClass), + }) }) + glog.V(4).Infof("end doListFilerEntries isTruncated:%v nextMarker:%v reqDir: %v prefix: %v", isTruncated, nextMarker, reqDir, prefix) if doErr != nil { return doErr } @@ -179,6 +184,39 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m nextMarker = "" } + if len(contents) == 0 && len(commonPrefixes) == 0 && maxKeys > 0 { + if strings.HasSuffix(originalPrefix, "/") && prefix == "" { + reqDir, prefix = filepath.Split(strings.TrimSuffix(reqDir, "/")) + reqDir = strings.TrimSuffix(reqDir, "/") + } + _, _, _, doErr = s3a.doListFilerEntries(client, reqDir, prefix, 1, prefix, delimiter, true, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) { + if entry.IsDirectoryKeyObject() && entry.Name == prefix { + storageClass := "STANDARD" + if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { + storageClass = string(v) + } + contents = append(contents, ListEntry{ + Key: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:], + LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), + ETag: "\"" + fmt.Sprintf("%x", entry.Attributes.Md5) + "\"", + Size: int64(filer.FileSize(entry)), + Owner: CanonicalUser{ + ID: fmt.Sprintf("%x", entry.Attributes.Uid), + DisplayName: entry.Attributes.UserName, + }, + StorageClass: StorageClass(storageClass), + }) + } + }) + if doErr != nil { + return doErr + } + } + + if len(nextMarker) > 0 { + nextMarker = nextMarker[bucketPrefixLen:] + } + response = ListBucketResult{ Name: bucket, Prefix: originalPrefix, @@ -197,7 +235,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m return } -func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) { +func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, inclusiveStartFrom bool, subEntries bool, bucketPrefixLen int, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) { // invariants // prefix and marker should be under dir, marker may contain "/" // maxKeys should be updated for each recursion @@ -210,19 +248,30 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } if strings.Contains(marker, "/") { + if strings.HasSuffix(marker, "/") { + marker = strings.TrimSuffix(marker, "/") + } sepIndex := strings.Index(marker, "/") - subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:] - // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys) - subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn) - if subErr != nil { - err = subErr - return + if sepIndex != -1 { + subPrefix, subMarker := marker[0:sepIndex], marker[sepIndex+1:] + subDir := fmt.Sprintf("%s/%s", dir[0:bucketPrefixLen-1], subPrefix) + if strings.HasPrefix(subDir, dir) { + subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, subDir, "", maxKeys, subMarker, delimiter, false, false, bucketPrefixLen, eachEntryFn) + if subErr != nil { + err = subErr + return + } + counter += subCounter + isTruncated = isTruncated || subIsTruncated + maxKeys -= subCounter + nextMarker = subNextMarker + // finished processing this sub directory + marker = subPrefix + } } - isTruncated = isTruncated || subIsTruncated - maxKeys -= subCounter - nextMarker = subDir + "/" + subNextMarker - // finished processing this sub directory - marker = subDir + } + if maxKeys <= 0 { + return } // now marker is also a direct child of dir @@ -231,7 +280,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d Prefix: prefix, Limit: uint32(maxKeys + 1), StartFromFileName: marker, - InclusiveStartFrom: false, + InclusiveStartFrom: inclusiveStartFrom, } ctx, cancel := context.WithCancel(context.Background()) @@ -257,39 +306,46 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d return } entry := resp.Entry - nextMarker = entry.Name + nextMarker = dir + "/" + entry.Name if entry.IsDirectory { // println("ListEntries", dir, "dir:", entry.Name) - if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys - if delimiter != "/" { + if entry.Name == ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys + continue + } + if delimiter == "" { + eachEntryFn(dir, entry) + // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter) + subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, false, true, bucketPrefixLen, eachEntryFn) + if subErr != nil { + err = fmt.Errorf("doListFilerEntries2: %v", subErr) + return + } + // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated) + if subCounter == 0 && entry.IsDirectoryKeyObject() { + entry.Name += "/" eachEntryFn(dir, entry) - // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter) - subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn) - if subErr != nil { - err = fmt.Errorf("doListFilerEntries2: %v", subErr) - return - } - // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated) - counter += subCounter - nextMarker = entry.Name + "/" + subNextMarker - if subIsTruncated { - isTruncated = true - return - } - } else { - var isEmpty bool - if !s3a.option.AllowEmptyFolder { - if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil { - glog.Errorf("check empty folder %s: %v", dir, err) - } - } - if !isEmpty { - eachEntryFn(dir, entry) - counter++ + counter++ + } + counter += subCounter + nextMarker = subNextMarker + if subIsTruncated { + isTruncated = true + return + } + } else if delimiter == "/" { + var isEmpty bool + if !s3a.option.AllowEmptyFolder && !entry.IsDirectoryKeyObject() { + if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil { + glog.Errorf("check empty folder %s: %v", dir, err) } } + if !isEmpty { + nextMarker += "/" + eachEntryFn(dir, entry) + counter++ + } } - } else { + } else if !(delimiter == "/" && subEntries) { // println("ListEntries", dir, "file:", entry.Name) eachEntryFn(dir, entry) counter++ diff --git a/weed/s3api/s3api_policy.go b/weed/s3api/s3api_policy.go new file mode 100644 index 000000000..6e2c8cfa2 --- /dev/null +++ b/weed/s3api/s3api_policy.go @@ -0,0 +1,148 @@ +package s3api + +import ( + "encoding/xml" + "time" +) + +// Status represents lifecycle configuration status +type ruleStatus string + +// Supported status types +const ( + Enabled ruleStatus = "Enabled" + Disabled ruleStatus = "Disabled" +) + +// Lifecycle - Configuration for bucket lifecycle. +type Lifecycle struct { + XMLName xml.Name `xml:"LifecycleConfiguration"` + Rules []Rule `xml:"Rule"` +} + +// Rule - a rule for lifecycle configuration. +type Rule struct { + XMLName xml.Name `xml:"Rule"` + ID string `xml:"ID,omitempty"` + Status ruleStatus `xml:"Status"` + Filter Filter `xml:"Filter,omitempty"` + Prefix Prefix `xml:"Prefix,omitempty"` + Expiration Expiration `xml:"Expiration,omitempty"` + Transition Transition `xml:"Transition,omitempty"` +} + +// Filter - a filter for a lifecycle configuration Rule. +type Filter struct { + XMLName xml.Name `xml:"Filter"` + set bool + + Prefix Prefix + + And And + andSet bool + + Tag Tag + tagSet bool +} + +// Prefix holds the prefix xml tag in <Rule> and <Filter> +type Prefix struct { + string + set bool +} + +// MarshalXML encodes Prefix field into an XML form. +func (p Prefix) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error { + if !p.set { + return nil + } + return e.EncodeElement(p.string, startElement) +} + +// MarshalXML encodes Filter field into an XML form. +func (f Filter) MarshalXML(e *xml.Encoder, start xml.StartElement) error { + if err := e.EncodeToken(start); err != nil { + return err + } + if err := e.EncodeElement(f.Prefix, xml.StartElement{Name: xml.Name{Local: "Prefix"}}); err != nil { + return err + } + return e.EncodeToken(xml.EndElement{Name: start.Name}) +} + +// And - a tag to combine a prefix and multiple tags for lifecycle configuration rule. +type And struct { + XMLName xml.Name `xml:"And"` + Prefix Prefix `xml:"Prefix,omitempty"` + Tags []Tag `xml:"Tag,omitempty"` +} + +// Expiration - expiration actions for a rule in lifecycle configuration. +type Expiration struct { + XMLName xml.Name `xml:"Expiration"` + Days int `xml:"Days,omitempty"` + Date ExpirationDate `xml:"Date,omitempty"` + DeleteMarker ExpireDeleteMarker `xml:"ExpiredObjectDeleteMarker"` + + set bool +} + +// MarshalXML encodes expiration field into an XML form. +func (e Expiration) MarshalXML(enc *xml.Encoder, startElement xml.StartElement) error { + if !e.set { + return nil + } + type expirationWrapper Expiration + return enc.EncodeElement(expirationWrapper(e), startElement) +} + +// ExpireDeleteMarker represents value of ExpiredObjectDeleteMarker field in Expiration XML element. +type ExpireDeleteMarker struct { + val bool + set bool +} + +// MarshalXML encodes delete marker boolean into an XML form. +func (b ExpireDeleteMarker) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error { + if !b.set { + return nil + } + return e.EncodeElement(b.val, startElement) +} + +// ExpirationDate is a embedded type containing time.Time to unmarshal +// Date in Expiration +type ExpirationDate struct { + time.Time +} + +// MarshalXML encodes expiration date if it is non-zero and encodes +// empty string otherwise +func (eDate ExpirationDate) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error { + if eDate.Time.IsZero() { + return nil + } + return e.EncodeElement(eDate.Format(time.RFC3339), startElement) +} + +// Transition - transition actions for a rule in lifecycle configuration. +type Transition struct { + XMLName xml.Name `xml:"Transition"` + Days int `xml:"Days,omitempty"` + Date time.Time `xml:"Date,omitempty"` + StorageClass string `xml:"StorageClass,omitempty"` + + set bool +} + +// MarshalXML encodes transition field into an XML form. +func (t Transition) MarshalXML(enc *xml.Encoder, start xml.StartElement) error { + if !t.set { + return nil + } + type transitionWrapper Transition + return enc.EncodeElement(transitionWrapper(t), start) +} + +// TransitionDays is a type alias to unmarshal Days in Transition +type TransitionDays int diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 27259c4a7..cc5ca5231 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -1,44 +1,81 @@ package s3api import ( + "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" + "net" "net/http" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb" . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" "google.golang.org/grpc" ) type S3ApiServerOption struct { - Filer string - Port int - FilerGrpcAddress string - Config string - DomainName string - BucketsPath string - GrpcDialOption grpc.DialOption - AllowEmptyFolder bool + Filer pb.ServerAddress + Port int + Config string + DomainName string + BucketsPath string + GrpcDialOption grpc.DialOption + AllowEmptyFolder bool + AllowDeleteBucketNotEmpty bool + LocalFilerSocket *string } type S3ApiServer struct { - option *S3ApiServerOption - iam *IdentityAccessManagement + s3_pb.UnimplementedSeaweedS3Server + option *S3ApiServerOption + iam *IdentityAccessManagement + cb *CircuitBreaker + randomClientId int32 + filerGuard *security.Guard + client *http.Client } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { + v := util.GetViper() + signingKey := v.GetString("jwt.filer_signing.key") + v.SetDefault("jwt.filer_signing.expires_after_seconds", 10) + expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds") + + readSigningKey := v.GetString("jwt.filer_signing.read.key") + v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) + readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") + s3ApiServer = &S3ApiServer{ - option: option, - iam: NewIdentityAccessManagement(option), + option: option, + iam: NewIdentityAccessManagement(option), + randomClientId: util.RandomInt32(), + filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), + cb: NewCircuitBreaker(option), + } + if option.LocalFilerSocket == nil || *option.LocalFilerSocket == "" { + s3ApiServer.client = &http.Client{Transport: &http.Transport{ + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 1024, + }} + } else { + s3ApiServer.client = &http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", *option.LocalFilerSocket) + }, + }, + } } s3ApiServer.registerRouter(router) - go s3ApiServer.subscribeMetaEvents("s3", filer.IamConfigDirecotry+"/"+filer.IamIdentityFile, time.Now().UnixNano()) - + go s3ApiServer.subscribeMetaEvents("s3", filer.DirectoryEtcRoot, time.Now().UnixNano()) return s3ApiServer, nil } @@ -63,73 +100,126 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { for _, bucket := range routers { - // HeadObject - bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.HeadObjectHandler, ACTION_READ), "GET")) - // HeadBucket - bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.HeadBucketHandler, ACTION_ADMIN), "GET")) + // each case should follow the next rule: + // - requesting object with query must precede any other methods + // - requesting object must precede any methods with buckets + // - requesting bucket with query must precede raw methods with buckets + // - requesting bucket must be processed in the end + + // objects with query // CopyObjectPart - bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") + bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") // PutObjectPart - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") // CompleteMultipartUpload - bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.CompleteMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploadId", "{uploadId:.*}") + bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}") // NewMultipartUpload - bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.NewMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploads", "") + bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "") // AbortMultipartUpload - bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.AbortMultipartUploadHandler, ACTION_WRITE), "DELETE")).Queries("uploadId", "{uploadId:.*}") + bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}") // ListObjectParts - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_READ), "GET")).Queries("uploadId", "{uploadId:.*}") + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}") // ListMultipartUploads - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_READ), "GET")).Queries("uploads", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "") // GetObjectTagging - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectTaggingHandler, ACTION_READ), "GET")).Queries("tagging", "") + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "") // PutObjectTagging - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectTaggingHandler, ACTION_TAGGING), "PUT")).Queries("tagging", "") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectTaggingHandler, ACTION_TAGGING)), "PUT")).Queries("tagging", "") // DeleteObjectTagging - bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "") + bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "") + + // PutObjectACL + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "") + // PutObjectRetention + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "") + // PutObjectLegalHold + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLegalHoldHandler, ACTION_WRITE)), "PUT")).Queries("legal-hold", "") + // PutObjectLockConfiguration + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "") + + // GetObjectACL + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ)), "GET")).Queries("acl", "") + + // objects with query + + // raw objects + + // HeadObject + bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadObjectHandler, ACTION_READ)), "GET")) + + // GetObject, but directory listing is not supported + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectHandler, ACTION_READ)), "GET")) // CopyObject - bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY")) + bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectHandler, ACTION_WRITE)), "COPY")) // PutObject - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectHandler, ACTION_WRITE), "PUT")) - // PutBucket - bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketHandler, ACTION_ADMIN), "PUT")) - + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectHandler, ACTION_WRITE)), "PUT")) // DeleteObject - bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectHandler, ACTION_WRITE), "DELETE")) - // DeleteBucket - bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketHandler, ACTION_WRITE), "DELETE")) + bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectHandler, ACTION_WRITE)), "DELETE")) + + // raw objects + + // buckets with query + + // DeleteMultipleObjects + bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "") + + // GetBucketACL + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ)), "GET")).Queries("acl", "") + // PutBucketACL + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "") + + // GetBucketPolicy + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "") + // PutBucketPolicy + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketPolicyHandler, ACTION_WRITE)), "PUT")).Queries("policy", "") + // DeleteBucketPolicy + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketPolicyHandler, ACTION_WRITE)), "DELETE")).Queries("policy", "") + + // GetBucketCors + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketCorsHandler, ACTION_READ)), "GET")).Queries("cors", "") + // PutBucketCors + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketCorsHandler, ACTION_WRITE)), "PUT")).Queries("cors", "") + // DeleteBucketCors + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketCorsHandler, ACTION_WRITE)), "DELETE")).Queries("cors", "") + + // GetBucketLifecycleConfiguration + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ)), "GET")).Queries("lifecycle", "") + // PutBucketLifecycleConfiguration + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("lifecycle", "") + // DeleteBucketLifecycleConfiguration + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE)), "DELETE")).Queries("lifecycle", "") + + // GetBucketLocation + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLocationHandler, ACTION_READ)), "GET")).Queries("location", "") + + // GetBucketRequestPayment + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketRequestPaymentHandler, ACTION_READ)), "GET")).Queries("requestPayment", "") // ListObjectsV2 - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_LIST), "LIST")).Queries("list-type", "2") - // GetObject, but directory listing is not supported - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectHandler, ACTION_READ), "GET")) - // ListObjectsV1 (Legacy) - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_LIST), "LIST")) + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2") + + // buckets with query + + // raw buckets // PostPolicy - bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.PostPolicyBucketHandler, ACTION_WRITE), "POST")) + bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PostPolicyBucketHandler, ACTION_WRITE)), "POST")) - // DeleteMultipleObjects - bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE), "DELETE")).Queries("delete", "") - /* - - // not implemented - // GetBucketLocation - bucket.Methods("GET").HandlerFunc(s3a.GetBucketLocationHandler).Queries("location", "") - // GetBucketPolicy - bucket.Methods("GET").HandlerFunc(s3a.GetBucketPolicyHandler).Queries("policy", "") - // GetObjectACL - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(s3a.GetObjectACLHandler).Queries("acl", "") - // GetBucketACL - bucket.Methods("GET").HandlerFunc(s3a.GetBucketACLHandler).Queries("acl", "") - // PutBucketPolicy - bucket.Methods("PUT").HandlerFunc(s3a.PutBucketPolicyHandler).Queries("policy", "") - // DeleteBucketPolicy - bucket.Methods("DELETE").HandlerFunc(s3a.DeleteBucketPolicyHandler).Queries("policy", "") - */ + // HeadBucket + bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadBucketHandler, ACTION_READ)), "GET")) + + // PutBucket + bucket.Methods("PUT").HandlerFunc(track(s3a.PutBucketHandler, "PUT")) + // DeleteBucket + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_WRITE)), "DELETE")) + + // ListObjectsV1 (Legacy) + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST")) + + // raw buckets } diff --git a/weed/s3api/s3api_server_grpc.go b/weed/s3api/s3api_server_grpc.go new file mode 100644 index 000000000..e93d0056f --- /dev/null +++ b/weed/s3api/s3api_server_grpc.go @@ -0,0 +1,16 @@ +package s3api + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" +) + +func (s3a *S3ApiServer) Configure(ctx context.Context, request *s3_pb.S3ConfigureRequest) (*s3_pb.S3ConfigureResponse, error) { + + if err := s3a.iam.LoadS3ApiConfigurationFromBytes(request.S3ConfigurationFileContent); err != nil { + return nil, err + } + + return &s3_pb.S3ConfigureResponse{}, nil + +} diff --git a/weed/s3api/s3api_status_handlers.go b/weed/s3api/s3api_status_handlers.go index 914c27f40..fafb6ac2f 100644 --- a/weed/s3api/s3api_status_handlers.go +++ b/weed/s3api/s3api_status_handlers.go @@ -1,8 +1,11 @@ package s3api -import "net/http" +import ( + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "net/http" +) func (s3a *S3ApiServer) StatusHandler(w http.ResponseWriter, r *http.Request) { // write out the response code and content type header - writeSuccessResponseEmpty(w) + s3err.WriteResponse(w, r, http.StatusOK, []byte{}, "") } diff --git a/weed/s3api/s3api_xsd_generated.go b/weed/s3api/s3api_xsd_generated.go index 9d62afc4e..dd6a32ff2 100644 --- a/weed/s3api/s3api_xsd_generated.go +++ b/weed/s3api/s3api_xsd_generated.go @@ -8,12 +8,12 @@ import ( ) type AccessControlList struct { - Grant []Grant `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Grant,omitempty"` + Grant []Grant `xml:"Grant,omitempty"` } type AccessControlPolicy struct { - Owner CanonicalUser `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Owner"` - AccessControlList AccessControlList `xml:"http://s3.amazonaws.com/doc/2006-03-01/ AccessControlList"` + Owner CanonicalUser `xml:"Owner"` + AccessControlList AccessControlList `xml:"AccessControlList"` } type AmazonCustomerByEmail struct { @@ -467,11 +467,17 @@ func (t *GetObjectResult) UnmarshalXML(d *xml.Decoder, start xml.StartElement) e } type Grant struct { - Grantee Grantee `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Grantee"` - Permission Permission `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Permission"` + Grantee Grantee `xml:"Grantee"` + Permission Permission `xml:"Permission"` } type Grantee struct { + XMLNS string `xml:"xmlns:xsi,attr"` + XMLXSI string `xml:"xsi:type,attr"` + Type string `xml:"Type"` + ID string `xml:"ID,omitempty"` + DisplayName string `xml:"DisplayName,omitempty"` + URI string `xml:"URI,omitempty"` } type Group struct { @@ -640,6 +646,10 @@ type ListVersionsResult struct { CommonPrefixes []PrefixEntry `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CommonPrefixes,omitempty"` } +type LocationConstraint struct { + LocationConstraint string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LocationConstraint"` +} + type LoggingSettings struct { TargetBucket string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ TargetBucket"` TargetPrefix string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ TargetPrefix"` diff --git a/weed/s3api/s3err/audit_fluent.go b/weed/s3api/s3err/audit_fluent.go new file mode 100644 index 000000000..2deb56896 --- /dev/null +++ b/weed/s3api/s3err/audit_fluent.go @@ -0,0 +1,183 @@ +package s3err + +import ( + "encoding/json" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/fluent/fluent-logger-golang/fluent" + "net/http" + "os" + "time" +) + +type AccessLogExtend struct { + AccessLog + AccessLogHTTP +} + +type AccessLog struct { + Bucket string `msg:"bucket" json:"bucket"` // awsexamplebucket1 + Time int64 `msg:"time" json:"time"` // [06/Feb/2019:00:00:38 +0000] + RemoteIP string `msg:"remote_ip" json:"remote_ip,omitempty"` // 192.0.2.3 + Requester string `msg:"requester" json:"requester,omitempty"` // IAM user id + RequestID string `msg:"request_id" json:"request_id,omitempty"` // 3E57427F33A59F07 + Operation string `msg:"operation" json:"operation,omitempty"` // REST.HTTP_method.resource_type REST.PUT.OBJECT + Key string `msg:"key" json:"key,omitempty"` // /photos/2019/08/puppy.jpg + ErrorCode string `msg:"error_code" json:"error_code,omitempty"` + HostId string `msg:"host_id" json:"host_id,omitempty"` + HostHeader string `msg:"host_header" json:"host_header,omitempty"` // s3.us-west-2.amazonaws.com + UserAgent string `msg:"user_agent" json:"user_agent,omitempty"` + HTTPStatus int `msg:"status" json:"status,omitempty"` + SignatureVersion string `msg:"signature_version" json:"signature_version,omitempty"` +} + +type AccessLogHTTP struct { + RequestURI string `json:"request_uri,omitempty"` // "GET /awsexamplebucket1/photos/2019/08/puppy.jpg?x-foo=bar HTTP/1.1" + BytesSent string `json:"bytes_sent,omitempty"` + ObjectSize string `json:"object_size,omitempty"` + TotalTime int `json:"total_time,omitempty"` + TurnAroundTime int `json:"turn_around_time,omitempty"` + Referer string `json:"Referer,omitempty"` + VersionId string `json:"version_id,omitempty"` + CipherSuite string `json:"cipher_suite,omitempty"` + AuthenticationType string `json:"auth_type,omitempty"` + TLSVersion string `json:"TLS_version,omitempty"` +} + +const tag = "s3.access" + +var ( + Logger *fluent.Fluent + hostname = os.Getenv("HOSTNAME") + environment = os.Getenv("ENVIRONMENT") +) + +func InitAuditLog(config string) { + configContent, readErr := os.ReadFile(config) + if readErr != nil { + glog.Errorf("fail to read fluent config %s : %v", config, readErr) + return + } + fluentConfig := &fluent.Config{} + if err := json.Unmarshal(configContent, fluentConfig); err != nil { + glog.Errorf("fail to parse fluent config %s : %v", string(configContent), err) + return + } + if len(fluentConfig.TagPrefix) == 0 && len(environment) > 0 { + fluentConfig.TagPrefix = environment + } + fluentConfig.Async = true + fluentConfig.AsyncResultCallback = func(data []byte, err error) { + if err != nil { + glog.Warning("Error while posting log: ", err) + } + } + var err error + Logger, err = fluent.New(*fluentConfig) + if err != nil { + glog.Errorf("fail to load fluent config: %v", err) + } +} + +func getREST(httpMetod string, resourceType string) string { + return fmt.Sprintf("REST.%s.%s", httpMetod, resourceType) +} + +func getResourceType(object string, query_key string, metod string) (string, bool) { + if object == "/" { + switch query_key { + case "delete": + return "BATCH.DELETE.OBJECT", true + case "tagging": + return getREST(metod, "OBJECTTAGGING"), true + case "lifecycle": + return getREST(metod, "LIFECYCLECONFIGURATION"), true + case "acl": + return getREST(metod, "ACCESSCONTROLPOLICY"), true + case "policy": + return getREST(metod, "BUCKETPOLICY"), true + default: + return getREST(metod, "BUCKET"), false + } + } else { + switch query_key { + case "tagging": + return getREST(metod, "OBJECTTAGGING"), true + default: + return getREST(metod, "OBJECT"), false + } + } +} + +func getOperation(object string, r *http.Request) string { + queries := r.URL.Query() + var operation string + var queryFound bool + for key, _ := range queries { + operation, queryFound = getResourceType(object, key, r.Method) + if queryFound { + return operation + } + } + if len(queries) == 0 { + operation, _ = getResourceType(object, "", r.Method) + } + return operation +} + +func GetAccessHttpLog(r *http.Request, statusCode int, s3errCode ErrorCode) AccessLogHTTP { + return AccessLogHTTP{ + RequestURI: r.RequestURI, + Referer: r.Header.Get("Referer"), + } +} + +func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *AccessLog { + bucket, key := s3_constants.GetBucketAndObject(r) + var errorCode string + if s3errCode != ErrNone { + errorCode = GetAPIError(s3errCode).Code + } + remoteIP := r.Header.Get("X-Real-IP") + if len(remoteIP) == 0 { + remoteIP = r.RemoteAddr + } + hostHeader := r.Header.Get("X-Forwarded-Host") + if len(hostHeader) == 0 { + hostHeader = r.Host + } + return &AccessLog{ + HostHeader: hostHeader, + RequestID: r.Header.Get("X-Request-ID"), + RemoteIP: remoteIP, + Requester: r.Header.Get(s3_constants.AmzIdentityId), + SignatureVersion: r.Header.Get(s3_constants.AmzAuthType), + UserAgent: r.Header.Get("user-agent"), + HostId: hostname, + Bucket: bucket, + HTTPStatus: HTTPStatusCode, + Time: time.Now().Unix(), + Key: key, + Operation: getOperation(key, r), + ErrorCode: errorCode, + } +} + +func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) { + if Logger == nil { + return + } + if err := Logger.Post(tag, *GetAccessLog(r, HTTPStatusCode, errorCode)); err != nil { + glog.Warning("Error while posting log: ", err) + } +} + +func PostAccessLog(log AccessLog) { + if Logger == nil || len(log.Key) == 0 { + return + } + if err := Logger.Post(tag, log); err != nil { + glog.Warning("Error while posting log: ", err) + } +} diff --git a/weed/s3api/s3err/error_handler.go b/weed/s3api/s3err/error_handler.go index c1065fffc..6753a1641 100644 --- a/weed/s3api/s3err/error_handler.go +++ b/weed/s3api/s3err/error_handler.go @@ -19,15 +19,16 @@ const ( MimeXML mimeType = "application/xml" ) -func WriteXMLResponse(w http.ResponseWriter, statusCode int, response interface{}) { - WriteResponse(w, statusCode, EncodeXMLResponse(response), MimeXML) +func WriteXMLResponse(w http.ResponseWriter, r *http.Request, statusCode int, response interface{}) { + WriteResponse(w, r, statusCode, EncodeXMLResponse(response), MimeXML) } -func WriteEmptyResponse(w http.ResponseWriter, statusCode int) { - WriteResponse(w, statusCode, []byte{}, mimeNone) +func WriteEmptyResponse(w http.ResponseWriter, r *http.Request, statusCode int) { + WriteResponse(w, r, statusCode, []byte{}, mimeNone) + PostLog(r, statusCode, ErrNone) } -func WriteErrorResponse(w http.ResponseWriter, errorCode ErrorCode, r *http.Request) { +func WriteErrorResponse(w http.ResponseWriter, r *http.Request, errorCode ErrorCode) { vars := mux.Vars(r) bucket := vars["bucket"] object := vars["object"] @@ -38,7 +39,8 @@ func WriteErrorResponse(w http.ResponseWriter, errorCode ErrorCode, r *http.Requ apiError := GetAPIError(errorCode) errorResponse := getRESTErrorResponse(apiError, r.URL.Path, bucket, object) encodedErrorResponse := EncodeXMLResponse(errorResponse) - WriteResponse(w, apiError.HTTPStatusCode, encodedErrorResponse, MimeXML) + WriteResponse(w, r, apiError.HTTPStatusCode, encodedErrorResponse, MimeXML) + PostLog(r, apiError.HTTPStatusCode, errorCode) } func getRESTErrorResponse(err APIError, resource string, bucket, object string) RESTErrorResponse { @@ -61,13 +63,17 @@ func EncodeXMLResponse(response interface{}) []byte { return bytesBuffer.Bytes() } -func setCommonHeaders(w http.ResponseWriter) { +func setCommonHeaders(w http.ResponseWriter, r *http.Request) { w.Header().Set("x-amz-request-id", fmt.Sprintf("%d", time.Now().UnixNano())) w.Header().Set("Accept-Ranges", "bytes") + if r.Header.Get("Origin") != "" { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Credentials", "true") + } } -func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType mimeType) { - setCommonHeaders(w) +func WriteResponse(w http.ResponseWriter, r *http.Request, statusCode int, response []byte, mType mimeType) { + setCommonHeaders(w, r) if response != nil { w.Header().Set("Content-Length", strconv.Itoa(len(response))) } @@ -88,5 +94,5 @@ func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType // If none of the http routes match respond with MethodNotAllowed func NotFoundHandler(w http.ResponseWriter, r *http.Request) { glog.V(0).Infof("unsupported %s %s", r.Method, r.RequestURI) - WriteErrorResponse(w, ErrMethodNotAllowed, r) + WriteErrorResponse(w, r, ErrMethodNotAllowed) } diff --git a/weed/s3api/s3err/s3-error.go b/weed/s3api/s3err/s3-error.go index 224378ec5..b87764742 100644 --- a/weed/s3api/s3err/s3-error.go +++ b/weed/s3api/s3err/s3-error.go @@ -58,4 +58,5 @@ var s3ErrorResponseMap = map[string]string{ "InvalidDuration": "Duration provided in the request is invalid.", "XAmzContentSHA256Mismatch": "The provided 'x-amz-content-sha256' header does not match what was computed.", // Add new API errors here. + "NoSuchCORSConfiguration": "The CORS configuration does not exist", } diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go index a46bd0f04..57f269a2e 100644 --- a/weed/s3api/s3err/s3api_errors.go +++ b/weed/s3api/s3err/s3api_errors.go @@ -51,6 +51,9 @@ const ( ErrBucketAlreadyExists ErrBucketAlreadyOwnedByYou ErrNoSuchBucket + ErrNoSuchBucketPolicy + ErrNoSuchCORSConfiguration + ErrNoSuchLifecycleConfiguration ErrNoSuchKey ErrNoSuchUpload ErrInvalidBucketName @@ -58,8 +61,10 @@ const ( ErrInvalidMaxKeys ErrInvalidMaxUploads ErrInvalidMaxParts + ErrInvalidMaxDeleteObjects ErrInvalidPartNumberMarker ErrInvalidPart + ErrInvalidRange ErrInternalError ErrInvalidCopyDest ErrInvalidCopySource @@ -98,6 +103,10 @@ const ( ErrPreconditionFailed ErrExistingObjectIsDirectory + ErrExistingObjectIsFile + + ErrTooManyRequest + ErrRequestBytesExceed ) // error code to APIError structure, these fields carry respective @@ -153,6 +162,11 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "Argument max-parts must be an integer between 0 and 2147483647", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidMaxDeleteObjects: { + Code: "InvalidArgument", + Description: "Argument objects can contain a list of up to 1000 keys", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidPartNumberMarker: { Code: "InvalidArgument", Description: "Argument partNumberMarker must be an integer.", @@ -163,6 +177,21 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "The specified bucket does not exist", HTTPStatusCode: http.StatusNotFound, }, + ErrNoSuchBucketPolicy: { + Code: "NoSuchBucketPolicy", + Description: "The bucket policy does not exist", + HTTPStatusCode: http.StatusNotFound, + }, + ErrNoSuchCORSConfiguration: { + Code: "NoSuchCORSConfiguration", + Description: "The CORS configuration does not exist", + HTTPStatusCode: http.StatusNotFound, + }, + ErrNoSuchLifecycleConfiguration: { + Code: "NoSuchLifecycleConfiguration", + Description: "The lifecycle configuration does not exist", + HTTPStatusCode: http.StatusNotFound, + }, ErrNoSuchKey: { Code: "NoSuchKey", Description: "The specified key does not exist.", @@ -196,7 +225,7 @@ var errorCodeResponse = map[ErrorCode]APIError{ HTTPStatusCode: http.StatusBadRequest, }, ErrInvalidTag: { - Code: "InvalidArgument", + Code: "InvalidTag", Description: "The Tag value you have provided is invalid", HTTPStatusCode: http.StatusBadRequest, }, @@ -345,6 +374,11 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "Invalid Request", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidRange: { + Code: "InvalidRange", + Description: "The requested range is not satisfiable", + HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable, + }, ErrAuthNotSetup: { Code: "InvalidRequest", Description: "Signed request requires setting up SeaweedFS S3 authentication", @@ -365,6 +399,21 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "Existing Object is a directory.", HTTPStatusCode: http.StatusConflict, }, + ErrExistingObjectIsFile: { + Code: "ExistingObjectIsFile", + Description: "Existing Object is a file.", + HTTPStatusCode: http.StatusConflict, + }, + ErrTooManyRequest: { + Code: "ErrTooManyRequest", + Description: "Too many simultaneous request count", + HTTPStatusCode: http.StatusTooManyRequests, + }, + ErrRequestBytesExceed: { + Code: "ErrRequestBytesExceed", + Description: "Simultaneous request bytes exceed limitations", + HTTPStatusCode: http.StatusTooManyRequests, + }, } // GetAPIError provides API Error for input API error code. diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go index b667b32a0..003807a25 100644 --- a/weed/s3api/stats.go +++ b/weed/s3api/stats.go @@ -1,8 +1,8 @@ package s3api import ( + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" stats_collect "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strconv" "time" @@ -28,11 +28,12 @@ func (r *StatusRecorder) Flush() { func track(f http.HandlerFunc, action string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Server", "SeaweedFS S3 "+util.VERSION) + bucket, _ := s3_constants.GetBucketAndObject(r) + w.Header().Set("Server", "SeaweedFS S3") recorder := NewStatusResponseWriter(w) start := time.Now() f(recorder, r) - stats_collect.S3RequestHistogram.WithLabelValues(action).Observe(time.Since(start).Seconds()) - stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status)).Inc() + stats_collect.S3RequestHistogram.WithLabelValues(action, bucket).Observe(time.Since(start).Seconds()) + stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status), bucket).Inc() } } diff --git a/weed/s3api/tags.go b/weed/s3api/tags.go index 9ff7d1fba..979e5a80c 100644 --- a/weed/s3api/tags.go +++ b/weed/s3api/tags.go @@ -14,8 +14,9 @@ type TagSet struct { } type Tagging struct { - XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Tagging"` + XMLName xml.Name `xml:"Tagging"` TagSet TagSet `xml:"TagSet"` + Xmlns string `xml:"xmlns,attr"` } func (t *Tagging) ToTags() map[string]string { @@ -27,7 +28,7 @@ func (t *Tagging) ToTags() map[string]string { } func FromTags(tags map[string]string) (t *Tagging) { - t = &Tagging{} + t = &Tagging{Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/"} for k, v := range tags { t.TagSet.Tag = append(t.TagSet.Tag, Tag{ Key: k, diff --git a/weed/s3api/tags_test.go b/weed/s3api/tags_test.go index 52adb36c1..d8beb1922 100644 --- a/weed/s3api/tags_test.go +++ b/weed/s3api/tags_test.go @@ -32,6 +32,7 @@ func TestXMLUnmarshall(t *testing.T) { func TestXMLMarshall(t *testing.T) { tags := &Tagging{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", TagSet: TagSet{ []Tag{ { |
