diff options
Diffstat (limited to 'weed/util/limiter.go')
| -rw-r--r-- | weed/util/limiter.go | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/weed/util/limiter.go b/weed/util/limiter.go index 91499632c..2debaaa85 100644 --- a/weed/util/limiter.go +++ b/weed/util/limiter.go @@ -1,5 +1,12 @@ package util +import ( + "math/rand" + "reflect" + "sync" + "sync/atomic" +) + // initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go // LimitedConcurrentExecutor object @@ -38,3 +45,70 @@ func (c *LimitedConcurrentExecutor) Execute(job func()) { job() }() } + +// a different implementation, but somehow more "conservative" +type OperationRequest func() + +type LimitedOutOfOrderProcessor struct { + processorSlots uint32 + processors []chan OperationRequest + processorLimit int32 + processorLimitCond *sync.Cond + currentProcessor int32 +} + +func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) { + + processorSlots := uint32(32) + c = &LimitedOutOfOrderProcessor{ + processorSlots: processorSlots, + processors: make([]chan OperationRequest, processorSlots), + processorLimit: limit, + processorLimitCond: sync.NewCond(new(sync.Mutex)), + } + + for i := 0; i < int(processorSlots); i++ { + c.processors[i] = make(chan OperationRequest) + } + + cases := make([]reflect.SelectCase, processorSlots) + for i, ch := range c.processors { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } + + go func() { + for { + _, value, ok := reflect.Select(cases) + if !ok { + continue + } + + request := value.Interface().(OperationRequest) + + if c.processorLimit > 0 { + c.processorLimitCond.L.Lock() + for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit { + c.processorLimitCond.Wait() + } + atomic.AddInt32(&c.currentProcessor, 1) + c.processorLimitCond.L.Unlock() + } + + go func() { + if c.processorLimit > 0 { + defer atomic.AddInt32(&c.currentProcessor, -1) + defer c.processorLimitCond.Signal() + } + request() + }() + + } + }() + + return c +} + +func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) { + index := rand.Uint32() % c.processorSlots + c.processors[index] <- request +} |
