aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filesys/wfs.go4
-rw-r--r--weed/util/limiter.go40
2 files changed, 42 insertions, 2 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index f0ac6d80d..c6d9080a1 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -77,7 +77,7 @@ type WFS struct {
signature int32
// throttle writers
- concurrentWriters *util.LimitedOutOfOrderProcessor
+ concurrentWriters *util.LimitedConcurrentExecutor
Server *fs.Server
}
type statsCache struct {
@@ -135,7 +135,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.fsNodeCache = newFsCache(wfs.root)
if wfs.option.ConcurrentWriters > 0 {
- wfs.concurrentWriters = util.NewLimitedOutOfOrderProcessor(int32(wfs.option.ConcurrentWriters))
+ wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
return wfs
diff --git a/weed/util/limiter.go b/weed/util/limiter.go
index 2e5168d3d..ccdaa701e 100644
--- a/weed/util/limiter.go
+++ b/weed/util/limiter.go
@@ -7,6 +7,46 @@ import (
"sync/atomic"
)
+// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
+
+// LimitedConcurrentExecutor object
+type LimitedConcurrentExecutor struct {
+ limit int
+ tokenChan chan int
+}
+
+func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
+
+ // allocate a limiter instance
+ c := &LimitedConcurrentExecutor{
+ limit: limit,
+ tokenChan: make(chan int, limit),
+ }
+
+ // allocate the tokenChan:
+ for i := 0; i < c.limit; i++ {
+ c.tokenChan <- i
+ }
+
+ return c
+}
+
+// Execute adds a function to the execution queue.
+// if num of go routines allocated by this instance is < limit
+// launch a new go routine to execute job
+// else wait until a go routine becomes available
+func (c *LimitedConcurrentExecutor) Execute(job func()) {
+ token := <-c.tokenChan
+ go func() {
+ defer func() {
+ c.tokenChan <- token
+ }()
+ // run the job
+ job()
+ }()
+}
+
+// a different implementation, but somehow more "conservative"
type OperationRequest func()
type LimitedOutOfOrderProcessor struct {