aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_server.go')
-rw-r--r--weed/s3api/s3api_server.go52
1 files changed, 31 insertions, 21 deletions
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index dcf3a55f2..a1a3f100b 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -9,6 +9,7 @@ import (
"os"
"slices"
"strings"
+ "sync"
"time"
"github.com/gorilla/mux"
@@ -48,22 +49,27 @@ type S3ApiServerOption struct {
DataCenter string
FilerGroup string
IamConfig string // Advanced IAM configuration file path
+ ConcurrentUploadLimit int64
+ ConcurrentFileUploadLimit int64
}
type S3ApiServer struct {
s3_pb.UnimplementedSeaweedS3Server
- option *S3ApiServerOption
- iam *IdentityAccessManagement
- iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication
- cb *CircuitBreaker
- randomClientId int32
- filerGuard *security.Guard
- filerClient *wdclient.FilerClient
- client util_http_client.HTTPClientInterface
- bucketRegistry *BucketRegistry
- credentialManager *credential.CredentialManager
- bucketConfigCache *BucketConfigCache
- policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies
+ option *S3ApiServerOption
+ iam *IdentityAccessManagement
+ iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication
+ cb *CircuitBreaker
+ randomClientId int32
+ filerGuard *security.Guard
+ filerClient *wdclient.FilerClient
+ client util_http_client.HTTPClientInterface
+ bucketRegistry *BucketRegistry
+ credentialManager *credential.CredentialManager
+ bucketConfigCache *BucketConfigCache
+ policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies
+ inFlightDataSize int64
+ inFlightUploads int64
+ inFlightDataLimitCond *sync.Cond
}
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
@@ -135,17 +141,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
}
s3ApiServer = &S3ApiServer{
- option: option,
- iam: iam,
- randomClientId: util.RandomInt32(),
- filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
- filerClient: filerClient,
- cb: NewCircuitBreaker(option),
- credentialManager: iam.credentialManager,
- bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
- policyEngine: policyEngine, // Initialize bucket policy engine
+ option: option,
+ iam: iam,
+ randomClientId: util.RandomInt32(),
+ filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
+ filerClient: filerClient,
+ cb: NewCircuitBreaker(option),
+ credentialManager: iam.credentialManager,
+ bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
+ policyEngine: policyEngine, // Initialize bucket policy engine
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
+ // Set s3a reference in circuit breaker for upload limiting
+ s3ApiServer.cb.s3a = s3ApiServer
+
// Pass policy engine to IAM for bucket policy evaluation
// This avoids circular dependency by not passing the entire S3ApiServer
iam.policyEngine = policyEngine