aboutsummaryrefslogtreecommitdiff
path: root/weed/util/limiter.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/util/limiter.go
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/util/limiter.go')
-rw-r--r--weed/util/limiter.go114
1 files changed, 114 insertions, 0 deletions
diff --git a/weed/util/limiter.go b/weed/util/limiter.go
new file mode 100644
index 000000000..2debaaa85
--- /dev/null
+++ b/weed/util/limiter.go
@@ -0,0 +1,114 @@
+package util
+
+import (
+ "math/rand"
+ "reflect"
+ "sync"
+ "sync/atomic"
+)
+
+// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
+
+// LimitedConcurrentExecutor object
+type LimitedConcurrentExecutor struct {
+ limit int
+ tokenChan chan int
+}
+
+func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
+
+ // allocate a limiter instance
+ c := &LimitedConcurrentExecutor{
+ limit: limit,
+ tokenChan: make(chan int, limit),
+ }
+
+ // allocate the tokenChan:
+ for i := 0; i < c.limit; i++ {
+ c.tokenChan <- i
+ }
+
+ return c
+}
+
+// Execute adds a function to the execution queue.
+// if num of go routines allocated by this instance is < limit
+// launch a new go routine to execute job
+// else wait until a go routine becomes available
+func (c *LimitedConcurrentExecutor) Execute(job func()) {
+ token := <-c.tokenChan
+ go func() {
+ defer func() {
+ c.tokenChan <- token
+ }()
+ // run the job
+ job()
+ }()
+}
+
+// a different implementation, but somehow more "conservative"
+type OperationRequest func()
+
+type LimitedOutOfOrderProcessor struct {
+ processorSlots uint32
+ processors []chan OperationRequest
+ processorLimit int32
+ processorLimitCond *sync.Cond
+ currentProcessor int32
+}
+
+func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
+
+ processorSlots := uint32(32)
+ c = &LimitedOutOfOrderProcessor{
+ processorSlots: processorSlots,
+ processors: make([]chan OperationRequest, processorSlots),
+ processorLimit: limit,
+ processorLimitCond: sync.NewCond(new(sync.Mutex)),
+ }
+
+ for i := 0; i < int(processorSlots); i++ {
+ c.processors[i] = make(chan OperationRequest)
+ }
+
+ cases := make([]reflect.SelectCase, processorSlots)
+ for i, ch := range c.processors {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
+
+ go func() {
+ for {
+ _, value, ok := reflect.Select(cases)
+ if !ok {
+ continue
+ }
+
+ request := value.Interface().(OperationRequest)
+
+ if c.processorLimit > 0 {
+ c.processorLimitCond.L.Lock()
+ for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
+ c.processorLimitCond.Wait()
+ }
+ atomic.AddInt32(&c.currentProcessor, 1)
+ c.processorLimitCond.L.Unlock()
+ }
+
+ go func() {
+ if c.processorLimit > 0 {
+ defer atomic.AddInt32(&c.currentProcessor, -1)
+ defer c.processorLimitCond.Signal()
+ }
+ request()
+ }()
+
+ }
+ }()
+
+ return c
+}
+
+func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
+ index := rand.Uint32() % c.processorSlots
+ c.processors[index] <- request
+}