diff options
| author | Chris Lu <chris.lu@uber.com> | 2021-04-02 01:10:24 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2021-04-02 01:10:24 -0700 |
| commit | 7e8edc3c4adaf1251f5c773cbeaf3a868269f97a (patch) | |
| tree | ba87171aee996af75425171db7fd60cae041a387 /weed/util/limiter.go | |
| parent | cc0df36a9e767b27e313885a79119e6b14c3cbad (diff) | |
| download | seaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.tar.xz seaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.zip | |
refactoring
Diffstat (limited to 'weed/util/limiter.go')
| -rw-r--r-- | weed/util/limiter.go | 82 |
1 files changed, 56 insertions, 26 deletions
diff --git a/weed/util/limiter.go b/weed/util/limiter.go index 91499632c..2e5168d3d 100644 --- a/weed/util/limiter.go +++ b/weed/util/limiter.go @@ -1,40 +1,70 @@ package util -// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go +import ( + "math/rand" + "reflect" + "sync" + "sync/atomic" +) -// LimitedConcurrentExecutor object -type LimitedConcurrentExecutor struct { - limit int - tokenChan chan int +type OperationRequest func() + +type LimitedOutOfOrderProcessor struct { + processorSlots uint32 + processors []chan OperationRequest + processorLimit int32 + processorLimitCond *sync.Cond + currentProcessor int32 } -func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { +func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) { - // allocate a limiter instance - c := &LimitedConcurrentExecutor{ - limit: limit, - tokenChan: make(chan int, limit), + processorSlots := uint32(32) + c = &LimitedOutOfOrderProcessor{ + processorSlots: processorSlots, + processors: make([]chan OperationRequest, processorSlots), + processorLimit: limit, + processorLimitCond: sync.NewCond(new(sync.Mutex)), } - // allocate the tokenChan: - for i := 0; i < c.limit; i++ { - c.tokenChan <- i + for i := 0; i < int(processorSlots); i++ { + c.processors[i] = make(chan OperationRequest) } - return c -} + cases := make([]reflect.SelectCase, processorSlots) + for i, ch := range c.processors { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } -// Execute adds a function to the execution queue. -// if num of go routines allocated by this instance is < limit -// launch a new go routine to execute job -// else wait until a go routine becomes available -func (c *LimitedConcurrentExecutor) Execute(job func()) { - token := <-c.tokenChan go func() { - defer func() { - c.tokenChan <- token - }() - // run the job - job() + for { + _, value, ok := reflect.Select(cases) + if !ok { + continue + } + + request := value.Interface().(OperationRequest) + + c.processorLimitCond.L.Lock() + for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit { + c.processorLimitCond.Wait() + } + atomic.AddInt32(&c.currentProcessor, 1) + c.processorLimitCond.L.Unlock() + + go func() { + defer atomic.AddInt32(&c.currentProcessor, -1) + defer c.processorLimitCond.Signal() + request() + }() + + } }() + + return c +} + +func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) { + index := rand.Uint32() % c.processorSlots + c.processors[index] <- request } |
