diff options
| author | Chris Lu <chris.lu@uber.com> | 2021-04-02 02:21:38 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2021-04-02 02:21:38 -0700 |
| commit | bdf2ddddfdccbc3d48e763457a779c8cc9ece6ac (patch) | |
| tree | 3d776b98e42f977b5113fd1db685cfedf4c273d1 /weed | |
| parent | 67e019d54b2f7bde826584c1b752305ec1702ca4 (diff) | |
| download | seaweedfs-bdf2ddddfdccbc3d48e763457a779c8cc9ece6ac.tar.xz seaweedfs-bdf2ddddfdccbc3d48e763457a779c8cc9ece6ac.zip | |
revert to same implementation as before
This reverts commit 7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/filesys/wfs.go | 4 | ||||
| -rw-r--r-- | weed/util/limiter.go | 40 |
2 files changed, 42 insertions, 2 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index f0ac6d80d..c6d9080a1 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -77,7 +77,7 @@ type WFS struct { signature int32 // throttle writers - concurrentWriters *util.LimitedOutOfOrderProcessor + concurrentWriters *util.LimitedConcurrentExecutor Server *fs.Server } type statsCache struct { @@ -135,7 +135,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.fsNodeCache = newFsCache(wfs.root) if wfs.option.ConcurrentWriters > 0 { - wfs.concurrentWriters = util.NewLimitedOutOfOrderProcessor(int32(wfs.option.ConcurrentWriters)) + wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) } return wfs diff --git a/weed/util/limiter.go b/weed/util/limiter.go index 2e5168d3d..ccdaa701e 100644 --- a/weed/util/limiter.go +++ b/weed/util/limiter.go @@ -7,6 +7,46 @@ import ( "sync/atomic" ) +// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go + +// LimitedConcurrentExecutor object +type LimitedConcurrentExecutor struct { + limit int + tokenChan chan int +} + +func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { + + // allocate a limiter instance + c := &LimitedConcurrentExecutor{ + limit: limit, + tokenChan: make(chan int, limit), + } + + // allocate the tokenChan: + for i := 0; i < c.limit; i++ { + c.tokenChan <- i + } + + return c +} + +// Execute adds a function to the execution queue. +// if num of go routines allocated by this instance is < limit +// launch a new go routine to execute job +// else wait until a go routine becomes available +func (c *LimitedConcurrentExecutor) Execute(job func()) { + token := <-c.tokenChan + go func() { + defer func() { + c.tokenChan <- token + }() + // run the job + job() + }() +} + +// a different implementation, but somehow more "conservative" type OperationRequest func() type LimitedOutOfOrderProcessor struct { |
