diff options
Diffstat (limited to 'weed/util/limited_async_pool.go')
| -rw-r--r-- | weed/util/limited_async_pool.go | 32 |
1 files changed, 27 insertions, 5 deletions
diff --git a/weed/util/limited_async_pool.go b/weed/util/limited_async_pool.go index c78de158b..51dfd6252 100644 --- a/weed/util/limited_async_pool.go +++ b/weed/util/limited_async_pool.go @@ -2,7 +2,11 @@ package util // initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg -import "context" +import ( + "container/list" + "context" + "sync" +) type Future interface { Await() interface{} @@ -17,23 +21,27 @@ func (f future) Await() interface{} { } type LimitedAsyncExecutor struct { - executor *LimitedConcurrentExecutor + executor *LimitedConcurrentExecutor + futureList *list.List + futureListCond *sync.Cond } func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor { return &LimitedAsyncExecutor{ - executor: NewLimitedConcurrentExecutor(limit), + executor: NewLimitedConcurrentExecutor(limit), + futureList: list.New(), + futureListCond: sync.NewCond(&sync.Mutex{}), } } -func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future { +func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) { var result interface{} c := make(chan struct{}) ae.executor.Execute(func() { defer close(c) result = job() }) - return future{await: func(ctx context.Context) interface{} { + f := future{await: func(ctx context.Context) interface{} { select { case <-ctx.Done(): return ctx.Err() @@ -41,4 +49,18 @@ func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future { return result } }} + ae.futureListCond.L.Lock() + ae.futureList.PushBack(f) + ae.futureListCond.Signal() + ae.futureListCond.L.Unlock() +} + +func (ae *LimitedAsyncExecutor) NextFuture() Future { + ae.futureListCond.L.Lock() + for ae.futureList.Len() == 0 { + ae.futureListCond.Wait() + } + f := ae.futureList.Remove(ae.futureList.Front()) + ae.futureListCond.L.Unlock() + return f.(Future) } |
