diff options
| author | 石昌林 <changlin.shi@ly.com> | 2022-06-15 21:07:55 +0800 |
|---|---|---|
| committer | 石昌林 <changlin.shi@ly.com> | 2022-06-15 21:07:55 +0800 |
| commit | 78b372816935c663f7f56b92e0e79e37e99cd4d2 (patch) | |
| tree | e5867fad83ef484f406f660097bbaf07258c6dcd /weed/s3api/s3api_server.go | |
| parent | b22ca85fbb674c445096b739af142dbf53dbd72b (diff) | |
| download | seaweedfs-78b372816935c663f7f56b92e0e79e37e99cd4d2.tar.xz seaweedfs-78b372816935c663f7f56b92e0e79e37e99cd4d2.zip | |
add s3 circuit breaker support for 'simultaneous request count' and 'simultaneous request bytes' limitations
configure s3 circuit breaker by 'command_s3_circuitbreaker.go':
usage eg:
# Configure the number of simultaneous global (current s3api node) requests
s3.circuit.breaker -global -type count -actions Write -values 1000 -apply
# Configure the number of simultaneous requests for bucket x read and write
s3.circuit.breaker -buckets -type count -actions Read,Write -values 1000 -apply
# Configure the total bytes of simultaneous requests for bucket write
s3.circuit.breaker -buckets -type bytes -actions Write -values 100MiB -apply
# Disable circuit breaker config of bucket 'x'
s3.circuit.breaker -buckets x -enable false -apply
# Delete circuit breaker config of bucket 'x'
s3.circuit.breaker -buckets x -delete -apply
Diffstat (limited to 'weed/s3api/s3api_server.go')
| -rw-r--r-- | weed/s3api/s3api_server.go | 83 |
1 files changed, 42 insertions, 41 deletions
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 657fa8171..d439ef8f1 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb" . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" @@ -35,6 +34,7 @@ type S3ApiServer struct { s3_pb.UnimplementedSeaweedS3Server option *S3ApiServerOption iam *IdentityAccessManagement + cb *CircuitBreaker randomClientId int32 filerGuard *security.Guard client *http.Client @@ -55,6 +55,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer iam: NewIdentityAccessManagement(option), randomClientId: util.RandomInt32(), filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), + cb: NewCircuitBreaker(option), } if option.LocalFilerSocket == nil || *option.LocalFilerSocket == "" { s3ApiServer.client = &http.Client{Transport: &http.Transport{ @@ -73,7 +74,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer s3ApiServer.registerRouter(router) - go s3ApiServer.subscribeMetaEvents("s3", filer.IamConfigDirecotry+"/"+filer.IamIdentityFile, time.Now().UnixNano()) + go s3ApiServer.subscribeMetaEvents("s3", "/etc", time.Now().UnixNano()) return s3ApiServer, nil } @@ -107,115 +108,115 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { // objects with query // CopyObjectPart - bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") + bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.CopyObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") // PutObjectPart - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") // CompleteMultipartUpload - bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.CompleteMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploadId", "{uploadId:.*}") + bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}") // NewMultipartUpload - bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.NewMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploads", "") + bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "") // AbortMultipartUpload - bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.AbortMultipartUploadHandler, ACTION_WRITE), "DELETE")).Queries("uploadId", "{uploadId:.*}") + bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}") // ListObjectParts - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_READ), "GET")).Queries("uploadId", "{uploadId:.*}") + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}") // ListMultipartUploads - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_READ), "GET")).Queries("uploads", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "") // GetObjectTagging - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectTaggingHandler, ACTION_READ), "GET")).Queries("tagging", "") + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "") // PutObjectTagging - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectTaggingHandler, ACTION_TAGGING), "PUT")).Queries("tagging", "") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutObjectTaggingHandler, ACTION_TAGGING)), "PUT")).Queries("tagging", "") // DeleteObjectTagging - bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "") + bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "") // PutObjectACL - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectAclHandler, ACTION_WRITE), "PUT")).Queries("acl", "") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutObjectAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "") // PutObjectRetention - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectRetentionHandler, ACTION_WRITE), "PUT")).Queries("retention", "") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "") // PutObjectLegalHold - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectLegalHoldHandler, ACTION_WRITE), "PUT")).Queries("legal-hold", "") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutObjectLegalHoldHandler, ACTION_WRITE)), "PUT")).Queries("legal-hold", "") // PutObjectLockConfiguration - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE), "PUT")).Queries("object-lock", "") + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "") // GetObjectACL - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectAclHandler, ACTION_READ), "GET")).Queries("acl", "") + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetObjectAclHandler, ACTION_READ)), "GET")).Queries("acl", "") // objects with query // raw objects // HeadObject - bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.HeadObjectHandler, ACTION_READ), "GET")) + bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.HeadObjectHandler, ACTION_READ)), "GET")) // GetObject, but directory listing is not supported - bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectHandler, ACTION_READ), "GET")) + bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetObjectHandler, ACTION_READ)), "GET")) // CopyObject - bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY")) + bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.CopyObjectHandler, ACTION_WRITE)), "COPY")) // PutObject - bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectHandler, ACTION_WRITE), "PUT")) + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutObjectHandler, ACTION_WRITE)), "PUT")) // DeleteObject - bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectHandler, ACTION_WRITE), "DELETE")) + bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.DeleteObjectHandler, ACTION_WRITE)), "DELETE")) // raw objects // buckets with query // DeleteMultipleObjects - bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE), "DELETE")).Queries("delete", "") + bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "") // GetBucketACL - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketAclHandler, ACTION_READ), "GET")).Queries("acl", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetBucketAclHandler, ACTION_READ)), "GET")).Queries("acl", "") // PutBucketACL - bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketAclHandler, ACTION_WRITE), "PUT")).Queries("acl", "") + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutBucketAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "") // GetBucketPolicy - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketPolicyHandler, ACTION_READ), "GET")).Queries("policy", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "") // PutBucketPolicy - bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketPolicyHandler, ACTION_WRITE), "PUT")).Queries("policy", "") + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutBucketPolicyHandler, ACTION_WRITE)), "PUT")).Queries("policy", "") // DeleteBucketPolicy - bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketPolicyHandler, ACTION_WRITE), "DELETE")).Queries("policy", "") + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.DeleteBucketPolicyHandler, ACTION_WRITE)), "DELETE")).Queries("policy", "") // GetBucketCors - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketCorsHandler, ACTION_READ), "GET")).Queries("cors", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetBucketCorsHandler, ACTION_READ)), "GET")).Queries("cors", "") // PutBucketCors - bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketCorsHandler, ACTION_WRITE), "PUT")).Queries("cors", "") + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutBucketCorsHandler, ACTION_WRITE)), "PUT")).Queries("cors", "") // DeleteBucketCors - bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketCorsHandler, ACTION_WRITE), "DELETE")).Queries("cors", "") + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.DeleteBucketCorsHandler, ACTION_WRITE)), "DELETE")).Queries("cors", "") // GetBucketLifecycleConfiguration - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ), "GET")).Queries("lifecycle", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ)), "GET")).Queries("lifecycle", "") // PutBucketLifecycleConfiguration - bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE), "PUT")).Queries("lifecycle", "") + bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("lifecycle", "") // DeleteBucketLifecycleConfiguration - bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE), "DELETE")).Queries("lifecycle", "") + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE)), "DELETE")).Queries("lifecycle", "") // GetBucketLocation - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketLocationHandler, ACTION_READ), "GET")).Queries("location", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetBucketLocationHandler, ACTION_READ)), "GET")).Queries("location", "") // GetBucketRequestPayment - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketRequestPaymentHandler, ACTION_READ), "GET")).Queries("requestPayment", "") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.GetBucketRequestPaymentHandler, ACTION_READ)), "GET")).Queries("requestPayment", "") // ListObjectsV2 - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_LIST), "LIST")).Queries("list-type", "2") + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2") // buckets with query // raw buckets // PostPolicy - bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.PostPolicyBucketHandler, ACTION_WRITE), "POST")) + bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.PostPolicyBucketHandler, ACTION_WRITE)), "POST")) // HeadBucket - bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.HeadBucketHandler, ACTION_READ), "GET")) + bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.HeadBucketHandler, ACTION_READ)), "GET")) // PutBucket bucket.Methods("PUT").HandlerFunc(track(s3a.PutBucketHandler, "PUT")) // DeleteBucket - bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketHandler, ACTION_WRITE), "DELETE")) + bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.DeleteBucketHandler, ACTION_WRITE)), "DELETE")) // ListObjectsV1 (Legacy) - bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_LIST), "LIST")) + bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Check(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST")) // raw buckets |
