aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/filer.go107
-rw-r--r--weed/command/s3.go6
-rw-r--r--weed/command/server.go3
-rw-r--r--weed/s3api/s3api_circuit_breaker.go43
-rw-r--r--weed/s3api/s3api_server.go52
-rw-r--r--weed/s3api/s3err/s3api_errors.go4
-rw-r--r--weed/server/filer_server.go46
-rw-r--r--weed/server/filer_server_handlers.go18
8 files changed, 181 insertions, 98 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 053c5a147..86991a181 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -42,38 +42,39 @@ var (
)
type FilerOptions struct {
- masters *pb.ServerDiscovery
- mastersString *string
- ip *string
- bindIp *string
- port *int
- portGrpc *int
- publicPort *int
- filerGroup *string
- collection *string
- defaultReplicaPlacement *string
- disableDirListing *bool
- maxMB *int
- dirListingLimit *int
- dataCenter *string
- rack *string
- enableNotification *bool
- disableHttp *bool
- cipher *bool
- metricsHttpPort *int
- metricsHttpIp *string
- saveToFilerLimit *int
- defaultLevelDbDirectory *string
- concurrentUploadLimitMB *int
- debug *bool
- debugPort *int
- localSocket *string
- showUIDirectoryDelete *bool
- downloadMaxMBps *int
- diskType *string
- allowedOrigins *string
- exposeDirectoryData *bool
- certProvider certprovider.Provider
+ masters *pb.ServerDiscovery
+ mastersString *string
+ ip *string
+ bindIp *string
+ port *int
+ portGrpc *int
+ publicPort *int
+ filerGroup *string
+ collection *string
+ defaultReplicaPlacement *string
+ disableDirListing *bool
+ maxMB *int
+ dirListingLimit *int
+ dataCenter *string
+ rack *string
+ enableNotification *bool
+ disableHttp *bool
+ cipher *bool
+ metricsHttpPort *int
+ metricsHttpIp *string
+ saveToFilerLimit *int
+ defaultLevelDbDirectory *string
+ concurrentUploadLimitMB *int
+ concurrentFileUploadLimit *int
+ debug *bool
+ debugPort *int
+ localSocket *string
+ showUIDirectoryDelete *bool
+ downloadMaxMBps *int
+ diskType *string
+ allowedOrigins *string
+ exposeDirectoryData *bool
+ certProvider certprovider.Provider
}
func init() {
@@ -99,6 +100,7 @@ func init() {
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
+ f.concurrentFileUploadLimit = cmdFiler.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited")
f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
@@ -127,6 +129,8 @@ func init() {
filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate")
filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.")
filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds")
+ filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3")
+ filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited")
// start webdav on filer
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")
@@ -310,25 +314,26 @@ func (fo *FilerOptions) startFiler() {
filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc)
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
- Masters: fo.masters,
- FilerGroup: *fo.filerGroup,
- Collection: *fo.collection,
- DefaultReplication: *fo.defaultReplicaPlacement,
- DisableDirListing: *fo.disableDirListing,
- MaxMB: *fo.maxMB,
- DirListingLimit: *fo.dirListingLimit,
- DataCenter: *fo.dataCenter,
- Rack: *fo.rack,
- DefaultLevelDbDir: defaultLevelDbDirectory,
- DisableHttp: *fo.disableHttp,
- Host: filerAddress,
- Cipher: *fo.cipher,
- SaveToFilerLimit: int64(*fo.saveToFilerLimit),
- ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
- ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
- DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
- DiskType: *fo.diskType,
- AllowedOrigins: strings.Split(*fo.allowedOrigins, ","),
+ Masters: fo.masters,
+ FilerGroup: *fo.filerGroup,
+ Collection: *fo.collection,
+ DefaultReplication: *fo.defaultReplicaPlacement,
+ DisableDirListing: *fo.disableDirListing,
+ MaxMB: *fo.maxMB,
+ DirListingLimit: *fo.dirListingLimit,
+ DataCenter: *fo.dataCenter,
+ Rack: *fo.rack,
+ DefaultLevelDbDir: defaultLevelDbDirectory,
+ DisableHttp: *fo.disableHttp,
+ Host: filerAddress,
+ Cipher: *fo.cipher,
+ SaveToFilerLimit: int64(*fo.saveToFilerLimit),
+ ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
+ ConcurrentFileUploadLimit: int64(*fo.concurrentFileUploadLimit),
+ ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
+ DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
+ DiskType: *fo.diskType,
+ AllowedOrigins: strings.Split(*fo.allowedOrigins, ","),
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 995d15f8a..61222336b 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -57,6 +57,8 @@ type S3Options struct {
localSocket *string
certProvider certprovider.Provider
idleTimeout *int
+ concurrentUploadLimitMB *int
+ concurrentFileUploadLimit *int
}
func init() {
@@ -83,6 +85,8 @@ func init() {
s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds")
+ s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
+ s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited")
}
var cmdS3 = &Command{
@@ -275,6 +279,8 @@ func (s3opt *S3Options) startS3Server() bool {
DataCenter: *s3opt.dataCenter,
FilerGroup: filerGroup,
IamConfig: iamConfigPath, // Advanced IAM config (optional)
+ ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024,
+ ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit),
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/server.go b/weed/command/server.go
index 3cdde48c6..47df30fc2 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -123,6 +123,7 @@ func init() {
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
+ filerOptions.concurrentFileUploadLimit = cmdServer.Flag.Int("filer.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited")
filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button")
filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
@@ -168,6 +169,8 @@ func init() {
s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
s3Options.bindIp = cmdServer.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.")
s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 10, "connection idle seconds")
+ s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3")
+ s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited")
sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port")
sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication")
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go
index 2f5e1f580..3c4f55a23 100644
--- a/weed/s3api/s3api_circuit_breaker.go
+++ b/weed/s3api/s3api_circuit_breaker.go
@@ -21,6 +21,7 @@ type CircuitBreaker struct {
Enabled bool
counters map[string]*int64
limitations map[string]int64
+ s3a *S3ApiServer
}
func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
@@ -89,6 +90,48 @@ func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerCo
func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) {
return func(w http.ResponseWriter, r *http.Request) {
+ // Apply upload limiting for write actions if configured
+ if cb.s3a != nil && (action == s3_constants.ACTION_WRITE) &&
+ (cb.s3a.option.ConcurrentUploadLimit != 0 || cb.s3a.option.ConcurrentFileUploadLimit != 0) {
+
+ // Get content length, default to 0 if not provided
+ contentLength := r.ContentLength
+ if contentLength < 0 {
+ contentLength = 0
+ }
+
+ // Wait until in flight data is less than the limit
+ cb.s3a.inFlightDataLimitCond.L.Lock()
+ inFlightDataSize := atomic.LoadInt64(&cb.s3a.inFlightDataSize)
+ inFlightUploads := atomic.LoadInt64(&cb.s3a.inFlightUploads)
+
+ // Wait if either data size limit or file count limit is exceeded
+ for (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) ||
+ (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) {
+ if (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) {
+ glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, cb.s3a.option.ConcurrentUploadLimit)
+ }
+ if (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) {
+ glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, cb.s3a.option.ConcurrentFileUploadLimit)
+ }
+ cb.s3a.inFlightDataLimitCond.Wait()
+ inFlightDataSize = atomic.LoadInt64(&cb.s3a.inFlightDataSize)
+ inFlightUploads = atomic.LoadInt64(&cb.s3a.inFlightUploads)
+ }
+ cb.s3a.inFlightDataLimitCond.L.Unlock()
+
+ // Increment counters
+ atomic.AddInt64(&cb.s3a.inFlightUploads, 1)
+ atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength)
+ defer func() {
+ // Decrement counters
+ atomic.AddInt64(&cb.s3a.inFlightUploads, -1)
+ atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength)
+ cb.s3a.inFlightDataLimitCond.Signal()
+ }()
+ }
+
+ // Apply circuit breaker logic
if !cb.Enabled {
f(w, r)
return
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index dcf3a55f2..a1a3f100b 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -9,6 +9,7 @@ import (
"os"
"slices"
"strings"
+ "sync"
"time"
"github.com/gorilla/mux"
@@ -48,22 +49,27 @@ type S3ApiServerOption struct {
DataCenter string
FilerGroup string
IamConfig string // Advanced IAM configuration file path
+ ConcurrentUploadLimit int64
+ ConcurrentFileUploadLimit int64
}
type S3ApiServer struct {
s3_pb.UnimplementedSeaweedS3Server
- option *S3ApiServerOption
- iam *IdentityAccessManagement
- iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication
- cb *CircuitBreaker
- randomClientId int32
- filerGuard *security.Guard
- filerClient *wdclient.FilerClient
- client util_http_client.HTTPClientInterface
- bucketRegistry *BucketRegistry
- credentialManager *credential.CredentialManager
- bucketConfigCache *BucketConfigCache
- policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies
+ option *S3ApiServerOption
+ iam *IdentityAccessManagement
+ iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication
+ cb *CircuitBreaker
+ randomClientId int32
+ filerGuard *security.Guard
+ filerClient *wdclient.FilerClient
+ client util_http_client.HTTPClientInterface
+ bucketRegistry *BucketRegistry
+ credentialManager *credential.CredentialManager
+ bucketConfigCache *BucketConfigCache
+ policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies
+ inFlightDataSize int64
+ inFlightUploads int64
+ inFlightDataLimitCond *sync.Cond
}
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
@@ -135,17 +141,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
}
s3ApiServer = &S3ApiServer{
- option: option,
- iam: iam,
- randomClientId: util.RandomInt32(),
- filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
- filerClient: filerClient,
- cb: NewCircuitBreaker(option),
- credentialManager: iam.credentialManager,
- bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
- policyEngine: policyEngine, // Initialize bucket policy engine
+ option: option,
+ iam: iam,
+ randomClientId: util.RandomInt32(),
+ filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
+ filerClient: filerClient,
+ cb: NewCircuitBreaker(option),
+ credentialManager: iam.credentialManager,
+ bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
+ policyEngine: policyEngine, // Initialize bucket policy engine
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
+ // Set s3a reference in circuit breaker for upload limiting
+ s3ApiServer.cb.s3a = s3ApiServer
+
// Pass policy engine to IAM for bucket policy evaluation
// This avoids circular dependency by not passing the entire S3ApiServer
iam.policyEngine = policyEngine
diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go
index 762289bce..189c6ba86 100644
--- a/weed/s3api/s3err/s3api_errors.go
+++ b/weed/s3api/s3err/s3api_errors.go
@@ -498,12 +498,12 @@ var errorCodeResponse = map[ErrorCode]APIError{
ErrTooManyRequest: {
Code: "ErrTooManyRequest",
Description: "Too many simultaneous request count",
- HTTPStatusCode: http.StatusTooManyRequests,
+ HTTPStatusCode: http.StatusServiceUnavailable,
},
ErrRequestBytesExceed: {
Code: "ErrRequestBytesExceed",
Description: "Simultaneous request bytes exceed limitations",
- HTTPStatusCode: http.StatusTooManyRequests,
+ HTTPStatusCode: http.StatusServiceUnavailable,
},
OwnershipControlsNotFoundError: {
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 3d08c0980..95d344af4 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -56,32 +56,34 @@ import (
)
type FilerOption struct {
- Masters *pb.ServerDiscovery
- FilerGroup string
- Collection string
- DefaultReplication string
- DisableDirListing bool
- MaxMB int
- DirListingLimit int
- DataCenter string
- Rack string
- DataNode string
- DefaultLevelDbDir string
- DisableHttp bool
- Host pb.ServerAddress
- recursiveDelete bool
- Cipher bool
- SaveToFilerLimit int64
- ConcurrentUploadLimit int64
- ShowUIDirectoryDelete bool
- DownloadMaxBytesPs int64
- DiskType string
- AllowedOrigins []string
- ExposeDirectoryData bool
+ Masters *pb.ServerDiscovery
+ FilerGroup string
+ Collection string
+ DefaultReplication string
+ DisableDirListing bool
+ MaxMB int
+ DirListingLimit int
+ DataCenter string
+ Rack string
+ DataNode string
+ DefaultLevelDbDir string
+ DisableHttp bool
+ Host pb.ServerAddress
+ recursiveDelete bool
+ Cipher bool
+ SaveToFilerLimit int64
+ ConcurrentUploadLimit int64
+ ConcurrentFileUploadLimit int64
+ ShowUIDirectoryDelete bool
+ DownloadMaxBytesPs int64
+ DiskType string
+ AllowedOrigins []string
+ ExposeDirectoryData bool
}
type FilerServer struct {
inFlightDataSize int64
+ inFlightUploads int64
listenersWaits int64
// notifying clients
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index dcfc8e3ed..a2eab9365 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -95,14 +95,28 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
contentLength := getContentLength(r)
fs.inFlightDataLimitCond.L.Lock()
inFlightDataSize := atomic.LoadInt64(&fs.inFlightDataSize)
- for fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit {
- glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit)
+ inFlightUploads := atomic.LoadInt64(&fs.inFlightUploads)
+
+ // Wait if either data size limit or file count limit is exceeded
+ for (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) || (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) {
+ if (fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit) {
+ glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit)
+ }
+ if (fs.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= fs.option.ConcurrentFileUploadLimit) {
+ glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, fs.option.ConcurrentFileUploadLimit)
+ }
fs.inFlightDataLimitCond.Wait()
inFlightDataSize = atomic.LoadInt64(&fs.inFlightDataSize)
+ inFlightUploads = atomic.LoadInt64(&fs.inFlightUploads)
}
fs.inFlightDataLimitCond.L.Unlock()
+
+ // Increment counters
+ atomic.AddInt64(&fs.inFlightUploads, 1)
atomic.AddInt64(&fs.inFlightDataSize, contentLength)
defer func() {
+ // Decrement counters
+ atomic.AddInt64(&fs.inFlightUploads, -1)
atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
fs.inFlightDataLimitCond.Signal()
}()