aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/s3api/s3api_circuit_breaker.go17
-rw-r--r--weed/s3api/s3api_circuit_breaker_test.go38
2 files changed, 33 insertions, 22 deletions
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go
index 7c8311d21..111b404c7 100644
--- a/weed/s3api/s3api_circuit_breaker.go
+++ b/weed/s3api/s3api_circuit_breaker.go
@@ -16,7 +16,7 @@ import (
)
type CircuitBreaker struct {
- sync.Mutex
+ sync.RWMutex
Enabled bool
counters map[string]*int64
limitations map[string]int64
@@ -110,7 +110,7 @@ func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request),
func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) {
//bucket simultaneous request count
- bucketCountRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest)
+ bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
if bucketCountRollBack != nil {
rollback = append(rollback, bucketCountRollBack)
}
@@ -119,7 +119,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
}
//bucket simultaneous request content bytes
- bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed)
+ bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
if bucketContentLengthRollBack != nil {
rollback = append(rollback, bucketContentLengthRollBack)
}
@@ -128,7 +128,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
}
//global simultaneous request count
- globalCountRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest)
+ globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
if globalCountRollBack != nil {
rollback = append(rollback, globalCountRollBack)
}
@@ -137,7 +137,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
}
//global simultaneous request content bytes
- globalContentLengthRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed)
+ globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
if globalContentLengthRollBack != nil {
rollback = append(rollback, globalContentLengthRollBack)
}
@@ -147,11 +147,13 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
return
}
-func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) {
- key := s3_constants.Concat(bucket, action, limitType)
+func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) {
e = s3err.ErrNone
if max, ok := cb.limitations[key]; ok {
+ cb.RLock()
counter, exists := cb.counters[key]
+ cb.RUnlock()
+
if !exists {
cb.Lock()
counter, exists = cb.counters[key]
@@ -171,7 +173,6 @@ func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string
f = func() {
atomic.AddInt64(counter, -inc)
}
- current = atomic.LoadInt64(counter)
if current > max {
e = errCode
return
diff --git a/weed/s3api/s3api_circuit_breaker_test.go b/weed/s3api/s3api_circuit_breaker_test.go
index f795b75fc..5848cf164 100644
--- a/weed/s3api/s3api_circuit_breaker_test.go
+++ b/weed/s3api/s3api_circuit_breaker_test.go
@@ -11,28 +11,38 @@ import (
)
type TestLimitCase struct {
- actionName string
+ actionName string
+
limitType string
bucketLimitValue int64
globalLimitValue int64
routineCount int
- reqBytes int64
-
successCount int64
}
var (
bucket = "/test"
- action = s3_constants.ACTION_READ
+ action = s3_constants.ACTION_WRITE
+ fileSize int64 = 200
+
TestLimitCases = []*TestLimitCase{
- {action, s3_constants.LimitTypeCount, 5, 5, 6, 1024, 5},
- {action, s3_constants.LimitTypeCount, 6, 6, 6, 1024, 6},
- {action, s3_constants.LimitTypeCount, 5, 6, 6, 1024, 5},
- {action, s3_constants.LimitTypeBytes, 1024, 1024, 6, 200, 5},
- {action, s3_constants.LimitTypeBytes, 1200, 1200, 6, 200, 6},
- {action, s3_constants.LimitTypeBytes, 11990, 11990, 60, 200, 59},
- {action, s3_constants.LimitTypeBytes, 11790, 11990, 70, 200, 58},
+
+ //bucket-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 5, 6, 60, 5},
+ {action, s3_constants.LimitTypeCount, 0, 6, 6, 0},
+
+ //global-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 6, 5, 6, 5},
+ {action, s3_constants.LimitTypeCount, 6, 0, 6, 0},
+
+ //bucket-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1000, 1020, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 0, 1020, 6, 0},
+
+ //global-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1020, 1000, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 1020, 0, 6, 0},
}
)
@@ -64,14 +74,14 @@ func TestLimit(t *testing.T) {
t.Fatal(err)
}
- successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: tc.reqBytes})
+ successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: fileSize}, tc.actionName)
if successCount != tc.successCount {
- t.Errorf("successCount not equal, expect=%d, actual=%d", tc.successCount, successCount)
+ t.Errorf("successCount not equal, expect=%d, actual=%d, case: %v", tc.successCount, successCount, tc)
}
}
}
-func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request) int64 {
+func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request, action string) int64 {
var successCounter int64
resultCh := make(chan []func(), routineCount)
var wg sync.WaitGroup