aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-25 13:45:55 -0700
committerchrislu <chris.lu@gmail.com>2022-09-25 13:45:55 -0700
commit5c8f1467a1c2e6db6025f387d779ecec222fb9b5 (patch)
tree04ecccda27061ff17d3c171a75138d952bebc1c1
parent973b8ff067bf4590dd224e0ed1bfba5b2af60910 (diff)
downloadseaweedfs-5c8f1467a1c2e6db6025f387d779ecec222fb9b5.tar.xz
seaweedfs-5c8f1467a1c2e6db6025f387d779ecec222fb9b5.zip
ordered execution async wait
-rw-r--r--weed/util/limited_async_pool.go44
-rw-r--r--weed/util/limited_async_pool_test.go58
2 files changed, 102 insertions, 0 deletions
diff --git a/weed/util/limited_async_pool.go b/weed/util/limited_async_pool.go
new file mode 100644
index 000000000..c78de158b
--- /dev/null
+++ b/weed/util/limited_async_pool.go
@@ -0,0 +1,44 @@
+package util
+
+// initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg
+
+import "context"
+
+type Future interface {
+ Await() interface{}
+}
+
+type future struct {
+ await func(ctx context.Context) interface{}
+}
+
+func (f future) Await() interface{} {
+ return f.await(context.Background())
+}
+
+type LimitedAsyncExecutor struct {
+ executor *LimitedConcurrentExecutor
+}
+
+func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor {
+ return &LimitedAsyncExecutor{
+ executor: NewLimitedConcurrentExecutor(limit),
+ }
+}
+
+func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future {
+ var result interface{}
+ c := make(chan struct{})
+ ae.executor.Execute(func() {
+ defer close(c)
+ result = job()
+ })
+ return future{await: func(ctx context.Context) interface{} {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c:
+ return result
+ }
+ }}
+}
diff --git a/weed/util/limited_async_pool_test.go b/weed/util/limited_async_pool_test.go
new file mode 100644
index 000000000..29a3d0498
--- /dev/null
+++ b/weed/util/limited_async_pool_test.go
@@ -0,0 +1,58 @@
+package util
+
+import (
+ "fmt"
+ "testing"
+ "time"
+)
+
+func TestAsyncPool(t *testing.T) {
+ p := NewLimitedAsyncExecutor(3)
+ var results []Future
+
+ results = append(results, p.Execute(FirstFunc))
+ results = append(results, p.Execute(SecondFunc))
+ results = append(results, p.Execute(ThirdFunc))
+ results = append(results, p.Execute(FourthFunc))
+ results = append(results, p.Execute(FifthFunc))
+
+ for _, r := range results {
+ x := r.Await().(int)
+ println(x)
+ }
+}
+
+func FirstFunc() any {
+ fmt.Println("-- Executing first function --")
+ time.Sleep(7 * time.Second)
+ fmt.Println("-- First Function finished --")
+ return 1
+}
+
+func SecondFunc() any {
+ fmt.Println("-- Executing second function --")
+ time.Sleep(5 * time.Second)
+ fmt.Println("-- Second Function finished --")
+ return 2
+}
+
+func ThirdFunc() any {
+ fmt.Println("-- Executing third function --")
+ time.Sleep(2 * time.Second)
+ fmt.Println("-- Third Function finished --")
+ return 3
+}
+
+func FourthFunc() any {
+ fmt.Println("-- Executing fourth function --")
+ time.Sleep(10 * time.Second)
+ fmt.Println("-- Fourth Function finished --")
+ return 4
+}
+
+func FifthFunc() any {
+ fmt.Println("-- Executing fifth function --")
+ time.Sleep(4 * time.Second)
+ fmt.Println("-- Fourth fifth finished --")
+ return 5
+}