diff options
Diffstat (limited to 'weed/s3api/s3api_circuit_breaker.go')
| -rw-r--r-- | weed/s3api/s3api_circuit_breaker.go | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 2f5e1f580..3c4f55a23 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -21,6 +21,7 @@ type CircuitBreaker struct { Enabled bool counters map[string]*int64 limitations map[string]int64 + s3a *S3ApiServer } func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { @@ -89,6 +90,48 @@ func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerCo func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { return func(w http.ResponseWriter, r *http.Request) { + // Apply upload limiting for write actions if configured + if cb.s3a != nil && (action == s3_constants.ACTION_WRITE) && + (cb.s3a.option.ConcurrentUploadLimit != 0 || cb.s3a.option.ConcurrentFileUploadLimit != 0) { + + // Get content length, default to 0 if not provided + contentLength := r.ContentLength + if contentLength < 0 { + contentLength = 0 + } + + // Wait until in flight data is less than the limit + cb.s3a.inFlightDataLimitCond.L.Lock() + inFlightDataSize := atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads := atomic.LoadInt64(&cb.s3a.inFlightUploads) + + // Wait if either data size limit or file count limit is exceeded + for (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) || + (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + if (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) { + glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, cb.s3a.option.ConcurrentUploadLimit) + } + if (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, cb.s3a.option.ConcurrentFileUploadLimit) + } + cb.s3a.inFlightDataLimitCond.Wait() + inFlightDataSize = atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads = atomic.LoadInt64(&cb.s3a.inFlightUploads) + } + cb.s3a.inFlightDataLimitCond.L.Unlock() + + // Increment counters + atomic.AddInt64(&cb.s3a.inFlightUploads, 1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength) + defer func() { + // Decrement counters + atomic.AddInt64(&cb.s3a.inFlightUploads, -1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength) + cb.s3a.inFlightDataLimitCond.Signal() + }() + } + + // Apply circuit breaker logic if !cb.Enabled { f(w, r) return |
