diff options
Diffstat (limited to 'weed/util/limiter.go')
| -rw-r--r-- | weed/util/limiter.go | 114 |
1 files changed, 0 insertions, 114 deletions
diff --git a/weed/util/limiter.go b/weed/util/limiter.go deleted file mode 100644 index 9d63c12a1..000000000 --- a/weed/util/limiter.go +++ /dev/null @@ -1,114 +0,0 @@ -package util - -import ( - "math/rand" - "reflect" - "sync" - "sync/atomic" -) - -// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go - -// LimitedConcurrentExecutor object -type LimitedConcurrentExecutor struct { - limit int - tokenChan chan int -} - -func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { - - // allocate a limiter instance - c := &LimitedConcurrentExecutor{ - limit: limit, - tokenChan: make(chan int, limit), - } - - // allocate the tokenChan: - for i := 0; i < c.limit; i++ { - c.tokenChan <- i - } - - return c -} - -// Execute adds a function to the execution queue. -// if num of go routines allocated by this instance is < limit -// launch a new go routine to execute job -// else wait until a go routine becomes available -func (c *LimitedConcurrentExecutor) Execute(job func()) { - token := <-c.tokenChan - go func() { - defer func() { - c.tokenChan <- token - }() - // run the job - job() - }() -} - -// a different implementation, but somehow more "conservative" -type OperationRequest func() - -type LimitedOutOfOrderProcessor struct { - processorLimit int32 - processorLimitCond *sync.Cond - processorSlots uint32 - processors []chan OperationRequest - currentProcessor int32 -} - -func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) { - - processorSlots := uint32(32) - c = &LimitedOutOfOrderProcessor{ - processorSlots: processorSlots, - processors: make([]chan OperationRequest, processorSlots), - processorLimit: limit, - processorLimitCond: sync.NewCond(new(sync.Mutex)), - } - - for i := 0; i < int(processorSlots); i++ { - c.processors[i] = make(chan OperationRequest) - } - - cases := make([]reflect.SelectCase, processorSlots) - for i, ch := range c.processors { - cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} - } - - go func() { - for { - _, value, ok := reflect.Select(cases) - if !ok { - continue - } - - request := value.Interface().(OperationRequest) - - if c.processorLimit > 0 { - c.processorLimitCond.L.Lock() - for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit { - c.processorLimitCond.Wait() - } - atomic.AddInt32(&c.currentProcessor, 1) - c.processorLimitCond.L.Unlock() - } - - go func() { - if c.processorLimit > 0 { - defer atomic.AddInt32(&c.currentProcessor, -1) - defer c.processorLimitCond.Signal() - } - request() - }() - - } - }() - - return c -} - -func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) { - index := rand.Uint32() % c.processorSlots - c.processors[index] <- request -} |
