aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/util/limited_async_pool.go32
-rw-r--r--weed/util/limited_async_pool_test.go16
2 files changed, 35 insertions, 13 deletions
diff --git a/weed/util/limited_async_pool.go b/weed/util/limited_async_pool.go
index c78de158b..51dfd6252 100644
--- a/weed/util/limited_async_pool.go
+++ b/weed/util/limited_async_pool.go
@@ -2,7 +2,11 @@ package util
// initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg
-import "context"
+import (
+ "container/list"
+ "context"
+ "sync"
+)
type Future interface {
Await() interface{}
@@ -17,23 +21,27 @@ func (f future) Await() interface{} {
}
type LimitedAsyncExecutor struct {
- executor *LimitedConcurrentExecutor
+ executor *LimitedConcurrentExecutor
+ futureList *list.List
+ futureListCond *sync.Cond
}
func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor {
return &LimitedAsyncExecutor{
- executor: NewLimitedConcurrentExecutor(limit),
+ executor: NewLimitedConcurrentExecutor(limit),
+ futureList: list.New(),
+ futureListCond: sync.NewCond(&sync.Mutex{}),
}
}
-func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future {
+func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) {
var result interface{}
c := make(chan struct{})
ae.executor.Execute(func() {
defer close(c)
result = job()
})
- return future{await: func(ctx context.Context) interface{} {
+ f := future{await: func(ctx context.Context) interface{} {
select {
case <-ctx.Done():
return ctx.Err()
@@ -41,4 +49,18 @@ func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future {
return result
}
}}
+ ae.futureListCond.L.Lock()
+ ae.futureList.PushBack(f)
+ ae.futureListCond.Signal()
+ ae.futureListCond.L.Unlock()
+}
+
+func (ae *LimitedAsyncExecutor) NextFuture() Future {
+ ae.futureListCond.L.Lock()
+ for ae.futureList.Len() == 0 {
+ ae.futureListCond.Wait()
+ }
+ f := ae.futureList.Remove(ae.futureList.Front())
+ ae.futureListCond.L.Unlock()
+ return f.(Future)
}
diff --git a/weed/util/limited_async_pool_test.go b/weed/util/limited_async_pool_test.go
index 935b158da..090ce5375 100644
--- a/weed/util/limited_async_pool_test.go
+++ b/weed/util/limited_async_pool_test.go
@@ -10,17 +10,17 @@ import (
func TestAsyncPool(t *testing.T) {
p := NewLimitedAsyncExecutor(3)
- var results []Future
- results = append(results, p.Execute(FirstFunc))
- results = append(results, p.Execute(SecondFunc))
- results = append(results, p.Execute(ThirdFunc))
- results = append(results, p.Execute(FourthFunc))
- results = append(results, p.Execute(FifthFunc))
+ p.Execute(FirstFunc)
+ p.Execute(SecondFunc)
+ p.Execute(ThirdFunc)
+ p.Execute(FourthFunc)
+ p.Execute(FifthFunc)
var sorted_results []int
- for _, r := range results {
- x := r.Await().(int)
+ for i := 0; i < 5; i++ {
+ f := p.NextFuture()
+ x := f.Await().(int)
println(x)
sorted_results = append(sorted_results, x)
}