aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-25 12:59:38 -0700
committerchrislu <chris.lu@gmail.com>2022-09-25 12:59:38 -0700
commite31fdbc89b04d61e9498333a44ec3d9110ad5bba (patch)
treee4f740f031206e865166ee1cf9df1d46ef9e679f
parent31922b2bf232af8de55adde44138437aeefea1c8 (diff)
downloadseaweedfs-e31fdbc89b04d61e9498333a44ec3d9110ad5bba.tar.xz
seaweedfs-e31fdbc89b04d61e9498333a44ec3d9110ad5bba.zip
rename file
-rw-r--r--weed/util/limited_pool.go40
-rw-r--r--weed/util/limiter.go114
2 files changed, 40 insertions, 114 deletions
diff --git a/weed/util/limited_pool.go b/weed/util/limited_pool.go
new file mode 100644
index 000000000..91499632c
--- /dev/null
+++ b/weed/util/limited_pool.go
@@ -0,0 +1,40 @@
+package util
+
+// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
+
+// LimitedConcurrentExecutor object
+type LimitedConcurrentExecutor struct {
+ limit int
+ tokenChan chan int
+}
+
+func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
+
+ // allocate a limiter instance
+ c := &LimitedConcurrentExecutor{
+ limit: limit,
+ tokenChan: make(chan int, limit),
+ }
+
+ // allocate the tokenChan:
+ for i := 0; i < c.limit; i++ {
+ c.tokenChan <- i
+ }
+
+ return c
+}
+
+// Execute adds a function to the execution queue.
+// if num of go routines allocated by this instance is < limit
+// launch a new go routine to execute job
+// else wait until a go routine becomes available
+func (c *LimitedConcurrentExecutor) Execute(job func()) {
+ token := <-c.tokenChan
+ go func() {
+ defer func() {
+ c.tokenChan <- token
+ }()
+ // run the job
+ job()
+ }()
+}
diff --git a/weed/util/limiter.go b/weed/util/limiter.go
deleted file mode 100644
index 9d63c12a1..000000000
--- a/weed/util/limiter.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package util
-
-import (
- "math/rand"
- "reflect"
- "sync"
- "sync/atomic"
-)
-
-// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
-
-// LimitedConcurrentExecutor object
-type LimitedConcurrentExecutor struct {
- limit int
- tokenChan chan int
-}
-
-func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
-
- // allocate a limiter instance
- c := &LimitedConcurrentExecutor{
- limit: limit,
- tokenChan: make(chan int, limit),
- }
-
- // allocate the tokenChan:
- for i := 0; i < c.limit; i++ {
- c.tokenChan <- i
- }
-
- return c
-}
-
-// Execute adds a function to the execution queue.
-// if num of go routines allocated by this instance is < limit
-// launch a new go routine to execute job
-// else wait until a go routine becomes available
-func (c *LimitedConcurrentExecutor) Execute(job func()) {
- token := <-c.tokenChan
- go func() {
- defer func() {
- c.tokenChan <- token
- }()
- // run the job
- job()
- }()
-}
-
-// a different implementation, but somehow more "conservative"
-type OperationRequest func()
-
-type LimitedOutOfOrderProcessor struct {
- processorLimit int32
- processorLimitCond *sync.Cond
- processorSlots uint32
- processors []chan OperationRequest
- currentProcessor int32
-}
-
-func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
-
- processorSlots := uint32(32)
- c = &LimitedOutOfOrderProcessor{
- processorSlots: processorSlots,
- processors: make([]chan OperationRequest, processorSlots),
- processorLimit: limit,
- processorLimitCond: sync.NewCond(new(sync.Mutex)),
- }
-
- for i := 0; i < int(processorSlots); i++ {
- c.processors[i] = make(chan OperationRequest)
- }
-
- cases := make([]reflect.SelectCase, processorSlots)
- for i, ch := range c.processors {
- cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
- }
-
- go func() {
- for {
- _, value, ok := reflect.Select(cases)
- if !ok {
- continue
- }
-
- request := value.Interface().(OperationRequest)
-
- if c.processorLimit > 0 {
- c.processorLimitCond.L.Lock()
- for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
- c.processorLimitCond.Wait()
- }
- atomic.AddInt32(&c.currentProcessor, 1)
- c.processorLimitCond.L.Unlock()
- }
-
- go func() {
- if c.processorLimit > 0 {
- defer atomic.AddInt32(&c.currentProcessor, -1)
- defer c.processorLimitCond.Signal()
- }
- request()
- }()
-
- }
- }()
-
- return c
-}
-
-func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
- index := rand.Uint32() % c.processorSlots
- c.processors[index] <- request
-}