diff options
| -rw-r--r-- | weed/command/filer.go | 107 | ||||
| -rw-r--r-- | weed/command/s3.go | 6 | ||||
| -rw-r--r-- | weed/command/server.go | 3 | ||||
| -rw-r--r-- | weed/s3api/s3api_circuit_breaker.go | 43 | ||||
| -rw-r--r-- | weed/s3api/s3api_server.go | 52 | ||||
| -rw-r--r-- | weed/s3api/s3err/s3api_errors.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 46 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers.go | 18 |
8 files changed, 181 insertions, 98 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go index 053c5a147..86991a181 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -42,38 +42,39 @@ var ( ) type FilerOptions struct { - masters *pb.ServerDiscovery - mastersString *string - ip *string - bindIp *string - port *int - portGrpc *int - publicPort *int - filerGroup *string - collection *string - defaultReplicaPlacement *string - disableDirListing *bool - maxMB *int - dirListingLimit *int - dataCenter *string - rack *string - enableNotification *bool - disableHttp *bool - cipher *bool - metricsHttpPort *int - metricsHttpIp *string - saveToFilerLimit *int - defaultLevelDbDirectory *string - concurrentUploadLimitMB *int - debug *bool - debugPort *int - localSocket *string - showUIDirectoryDelete *bool - downloadMaxMBps *int - diskType *string - allowedOrigins *string - exposeDirectoryData *bool - certProvider certprovider.Provider + masters *pb.ServerDiscovery + mastersString *string + ip *string + bindIp *string + port *int + portGrpc *int + publicPort *int + filerGroup *string + collection *string + defaultReplicaPlacement *string + disableDirListing *bool + maxMB *int + dirListingLimit *int + dataCenter *string + rack *string + enableNotification *bool + disableHttp *bool + cipher *bool + metricsHttpPort *int + metricsHttpIp *string + saveToFilerLimit *int + defaultLevelDbDirectory *string + concurrentUploadLimitMB *int + concurrentFileUploadLimit *int + debug *bool + debugPort *int + localSocket *string + showUIDirectoryDelete *bool + downloadMaxMBps *int + diskType *string + allowedOrigins *string + exposeDirectoryData *bool + certProvider certprovider.Provider } func init() { @@ -99,6 +100,7 @@ func init() { f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + f.concurrentFileUploadLimit = cmdFiler.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock") @@ -127,6 +129,8 @@ func init() { filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") + filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") // start webdav on filer filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway") @@ -310,25 +314,26 @@ func (fo *FilerOptions) startFiler() { filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc) fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: fo.masters, - FilerGroup: *fo.filerGroup, - Collection: *fo.collection, - DefaultReplication: *fo.defaultReplicaPlacement, - DisableDirListing: *fo.disableDirListing, - MaxMB: *fo.maxMB, - DirListingLimit: *fo.dirListingLimit, - DataCenter: *fo.dataCenter, - Rack: *fo.rack, - DefaultLevelDbDir: defaultLevelDbDirectory, - DisableHttp: *fo.disableHttp, - Host: filerAddress, - Cipher: *fo.cipher, - SaveToFilerLimit: int64(*fo.saveToFilerLimit), - ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, - ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, - DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, - DiskType: *fo.diskType, - AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), + Masters: fo.masters, + FilerGroup: *fo.filerGroup, + Collection: *fo.collection, + DefaultReplication: *fo.defaultReplicaPlacement, + DisableDirListing: *fo.disableDirListing, + MaxMB: *fo.maxMB, + DirListingLimit: *fo.dirListingLimit, + DataCenter: *fo.dataCenter, + Rack: *fo.rack, + DefaultLevelDbDir: defaultLevelDbDirectory, + DisableHttp: *fo.disableHttp, + Host: filerAddress, + Cipher: *fo.cipher, + SaveToFilerLimit: int64(*fo.saveToFilerLimit), + ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, + ConcurrentFileUploadLimit: int64(*fo.concurrentFileUploadLimit), + ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, + DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, + DiskType: *fo.diskType, + AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/s3.go b/weed/command/s3.go index 995d15f8a..61222336b 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -57,6 +57,8 @@ type S3Options struct { localSocket *string certProvider certprovider.Provider idleTimeout *int + concurrentUploadLimitMB *int + concurrentFileUploadLimit *int } func init() { @@ -83,6 +85,8 @@ func init() { s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds") + s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") } var cmdS3 = &Command{ @@ -275,6 +279,8 @@ func (s3opt *S3Options) startS3Server() bool { DataCenter: *s3opt.dataCenter, FilerGroup: filerGroup, IamConfig: iamConfigPath, // Advanced IAM config (optional) + ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024, + ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit), }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/server.go b/weed/command/server.go index 3cdde48c6..47df30fc2 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -123,6 +123,7 @@ func init() { filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") + filerOptions.concurrentFileUploadLimit = cmdServer.Flag.Int("filer.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock") filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button") filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second") @@ -168,6 +169,8 @@ func init() { s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") s3Options.bindIp = cmdServer.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") + s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port") sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication") diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 2f5e1f580..3c4f55a23 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -21,6 +21,7 @@ type CircuitBreaker struct { Enabled bool counters map[string]*int64 limitations map[string]int64 + s3a *S3ApiServer } func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { @@ -89,6 +90,48 @@ func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerCo func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { return func(w http.ResponseWriter, r *http.Request) { + // Apply upload limiting for write actions if configured + if cb.s3a != nil && (action == s3_constants.ACTION_WRITE) && + (cb.s3a.option.ConcurrentUploadLimit != 0 || cb.s3a.option.ConcurrentFileUploadLimit != 0) { + + // Get content length, default to 0 if not provided + contentLength := r.ContentLength + if contentLength < 0 { + contentLength = 0 + } + + // Wait until in flight data is less than the limit + cb.s3a.inFlightDataLimitCond.L.Lock() + inFlightDataSize := atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads := atomic.LoadInt64(&cb.s3a.inFlightUploads) + + // Wait if either data size limit or file count limit is exceeded + for (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) || + (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + if (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) { + glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, cb.s3a.option.ConcurrentUploadLimit) + } + if (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, cb.s3a.option.ConcurrentFileUploadLimit) + } + cb.s3a.inFlightDataLimitCond.Wait() + inFlightDataSize = atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads = atomic.LoadInt64(&cb.s3a.inFlightUploads) + } + cb.s3a.inFlightDataLimitCond.L.Unlock() + + // Increment counters + atomic.AddInt64(&cb.s3a.inFlightUploads, 1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength) + defer func() { + // Decrement counters + atomic.AddInt64(&cb.s3a.inFlightUploads, -1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength) + cb.s3a.inFlightDataLimitCond.Signal() + }() + } + + // Apply circuit breaker logic if !cb.Enabled { f(w, r) return diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index dcf3a55f2..a1a3f100b 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -9,6 +9,7 @@ import ( "os" "slices" "strings" + "sync" "time" "github.com/gorilla/mux" @@ -48,22 +49,27 @@ type S3ApiServerOption struct { DataCenter string FilerGroup string IamConfig string // Advanced IAM configuration file path + ConcurrentUploadLimit int64 + ConcurrentFileUploadLimit int64 } type S3ApiServer struct { s3_pb.UnimplementedSeaweedS3Server - option *S3ApiServerOption - iam *IdentityAccessManagement - iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication - cb *CircuitBreaker - randomClientId int32 - filerGuard *security.Guard - filerClient *wdclient.FilerClient - client util_http_client.HTTPClientInterface - bucketRegistry *BucketRegistry - credentialManager *credential.CredentialManager - bucketConfigCache *BucketConfigCache - policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies + option *S3ApiServerOption + iam *IdentityAccessManagement + iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication + cb *CircuitBreaker + randomClientId int32 + filerGuard *security.Guard + filerClient *wdclient.FilerClient + client util_http_client.HTTPClientInterface + bucketRegistry *BucketRegistry + credentialManager *credential.CredentialManager + bucketConfigCache *BucketConfigCache + policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies + inFlightDataSize int64 + inFlightUploads int64 + inFlightDataLimitCond *sync.Cond } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -135,17 +141,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl } s3ApiServer = &S3ApiServer{ - option: option, - iam: iam, - randomClientId: util.RandomInt32(), - filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), - filerClient: filerClient, - cb: NewCircuitBreaker(option), - credentialManager: iam.credentialManager, - bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven - policyEngine: policyEngine, // Initialize bucket policy engine + option: option, + iam: iam, + randomClientId: util.RandomInt32(), + filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), + filerClient: filerClient, + cb: NewCircuitBreaker(option), + credentialManager: iam.credentialManager, + bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven + policyEngine: policyEngine, // Initialize bucket policy engine + inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } + // Set s3a reference in circuit breaker for upload limiting + s3ApiServer.cb.s3a = s3ApiServer + // Pass policy engine to IAM for bucket policy evaluation // This avoids circular dependency by not passing the entire S3ApiServer iam.policyEngine = policyEngine diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go index 762289bce..189c6ba86 100644 --- a/weed/s3api/s3err/s3api_errors.go +++ b/weed/s3api/s3err/s3api_errors.go @@ -498,12 +498,12 @@ var errorCodeResponse = map[ErrorCode]APIError{ ErrTooManyRequest: { Code: "ErrTooManyRequest", Description: "Too many simultaneous request count", - HTTPStatusCode: http.StatusTooManyRequests, + HTTPStatusCode: http.StatusServiceUnavailable, }, ErrRequestBytesExceed: { Code: "ErrRequestBytesExceed", Description: "Simultaneous request bytes exceed limitations", - HTTPStatusCode: http.StatusTooManyRequests, + HTTPStatusCode: http.StatusServiceUnavailable, }, OwnershipControlsNotFoundError: { diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3d08c0980..95d344af4 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -56,32 +56,34 @@ import ( ) type FilerOption struct { - Masters *pb.ServerDiscovery - FilerGroup string - Collection string - DefaultReplication string - DisableDirListing bool - MaxMB int - DirListingLimit int - DataCenter string - Rack string - DataNode string - DefaultLevelDbDir string - DisableHttp bool - Host pb.ServerAddress - recursiveDelete bool - Cipher bool - SaveToFilerLimit int64 - ConcurrentUploadLimit int64 - ShowUIDirectoryDelete bool - DownloadMaxBytesPs int64 - DiskType string - AllowedOrigins []string - ExposeDirectoryData bool + Masters *pb.ServerDiscovery + FilerGroup string + Collection string + DefaultReplication string + DisableDirListing bool + MaxMB int + DirListingLimit int + DataCenter string + Rack string + DataNode string + DefaultLevelDbDir string + DisableHttp bool + Host pb.ServerAddress + recursiveDelete bool + Cipher bool + SaveToFilerLimit int64 + ConcurrentUploadLimit int64 + ConcurrentFileUploadLimit int64 + ShowUIDirectoryDelete bool + DownloadMaxBytesPs int64 + DiskType string + AllowedOrigins []string + ExposeDirectoryData bool } type FilerServer struct { inFlightDataSize int64 + inFlightUploads int64 listenersWaits int64 // notifying clients diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index dcfc8e3ed..a2eab9365 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -95,14 +95,28 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { contentLength := getContentLength(r) fs.inFlightDataLimitCond.L.Lock() inFlightDataSize := atomic.LoadInt64(&fs.inFlightDataSize) - for fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit { - glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit) + inFlightUploads := atomic.LoadInt64(&fs.inFlightUploads) + + // Wait if either data size limit or file count limit is exceeded + for (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) || (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) { + if (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) { + glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit) + } + if (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) { + glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, fs.option.ConcurrentFileUploadLimit) + } fs.inFlightDataLimitCond.Wait() inFlightDataSize = atomic.LoadInt64(&fs.inFlightDataSize) + inFlightUploads = atomic.LoadInt64(&fs.inFlightUploads) } fs.inFlightDataLimitCond.L.Unlock() + + // Increment counters + atomic.AddInt64(&fs.inFlightUploads, 1) atomic.AddInt64(&fs.inFlightDataSize, contentLength) defer func() { + // Decrement counters + atomic.AddInt64(&fs.inFlightUploads, -1) atomic.AddInt64(&fs.inFlightDataSize, -contentLength) fs.inFlightDataLimitCond.Signal() }() |
