aboutsummaryrefslogtreecommitdiff
path: root/weed/util/limiter.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/limiter.go')
-rw-r--r--weed/util/limiter.go74
1 files changed, 74 insertions, 0 deletions
diff --git a/weed/util/limiter.go b/weed/util/limiter.go
index 91499632c..2debaaa85 100644
--- a/weed/util/limiter.go
+++ b/weed/util/limiter.go
@@ -1,5 +1,12 @@
package util
+import (
+ "math/rand"
+ "reflect"
+ "sync"
+ "sync/atomic"
+)
+
// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
// LimitedConcurrentExecutor object
@@ -38,3 +45,70 @@ func (c *LimitedConcurrentExecutor) Execute(job func()) {
job()
}()
}
+
+// a different implementation, but somehow more "conservative"
+type OperationRequest func()
+
+type LimitedOutOfOrderProcessor struct {
+ processorSlots uint32
+ processors []chan OperationRequest
+ processorLimit int32
+ processorLimitCond *sync.Cond
+ currentProcessor int32
+}
+
+func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
+
+ processorSlots := uint32(32)
+ c = &LimitedOutOfOrderProcessor{
+ processorSlots: processorSlots,
+ processors: make([]chan OperationRequest, processorSlots),
+ processorLimit: limit,
+ processorLimitCond: sync.NewCond(new(sync.Mutex)),
+ }
+
+ for i := 0; i < int(processorSlots); i++ {
+ c.processors[i] = make(chan OperationRequest)
+ }
+
+ cases := make([]reflect.SelectCase, processorSlots)
+ for i, ch := range c.processors {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
+
+ go func() {
+ for {
+ _, value, ok := reflect.Select(cases)
+ if !ok {
+ continue
+ }
+
+ request := value.Interface().(OperationRequest)
+
+ if c.processorLimit > 0 {
+ c.processorLimitCond.L.Lock()
+ for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
+ c.processorLimitCond.Wait()
+ }
+ atomic.AddInt32(&c.currentProcessor, 1)
+ c.processorLimitCond.L.Unlock()
+ }
+
+ go func() {
+ if c.processorLimit > 0 {
+ defer atomic.AddInt32(&c.currentProcessor, -1)
+ defer c.processorLimitCond.Signal()
+ }
+ request()
+ }()
+
+ }
+ }()
+
+ return c
+}
+
+func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
+ index := rand.Uint32() % c.processorSlots
+ c.processors[index] <- request
+}