aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_circuit_breaker.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_circuit_breaker.go')
-rw-r--r--weed/s3api/s3api_circuit_breaker.go43
1 files changed, 43 insertions, 0 deletions
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go
index 2f5e1f580..3c4f55a23 100644
--- a/weed/s3api/s3api_circuit_breaker.go
+++ b/weed/s3api/s3api_circuit_breaker.go
@@ -21,6 +21,7 @@ type CircuitBreaker struct {
Enabled bool
counters map[string]*int64
limitations map[string]int64
+ s3a *S3ApiServer
}
func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
@@ -89,6 +90,48 @@ func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerCo
func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) {
return func(w http.ResponseWriter, r *http.Request) {
+ // Apply upload limiting for write actions if configured
+ if cb.s3a != nil && (action == s3_constants.ACTION_WRITE) &&
+ (cb.s3a.option.ConcurrentUploadLimit != 0 || cb.s3a.option.ConcurrentFileUploadLimit != 0) {
+
+ // Get content length, default to 0 if not provided
+ contentLength := r.ContentLength
+ if contentLength < 0 {
+ contentLength = 0
+ }
+
+ // Wait until in flight data is less than the limit
+ cb.s3a.inFlightDataLimitCond.L.Lock()
+ inFlightDataSize := atomic.LoadInt64(&cb.s3a.inFlightDataSize)
+ inFlightUploads := atomic.LoadInt64(&cb.s3a.inFlightUploads)
+
+ // Wait if either data size limit or file count limit is exceeded
+ for (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) ||
+ (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) {
+ if (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) {
+ glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, cb.s3a.option.ConcurrentUploadLimit)
+ }
+ if (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) {
+ glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, cb.s3a.option.ConcurrentFileUploadLimit)
+ }
+ cb.s3a.inFlightDataLimitCond.Wait()
+ inFlightDataSize = atomic.LoadInt64(&cb.s3a.inFlightDataSize)
+ inFlightUploads = atomic.LoadInt64(&cb.s3a.inFlightUploads)
+ }
+ cb.s3a.inFlightDataLimitCond.L.Unlock()
+
+ // Increment counters
+ atomic.AddInt64(&cb.s3a.inFlightUploads, 1)
+ atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength)
+ defer func() {
+ // Decrement counters
+ atomic.AddInt64(&cb.s3a.inFlightUploads, -1)
+ atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength)
+ cb.s3a.inFlightDataLimitCond.Signal()
+ }()
+ }
+
+ // Apply circuit breaker logic
if !cb.Enabled {
f(w, r)
return