aboutsummaryrefslogtreecommitdiff
path: root/weed/util/limited_async_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/limited_async_pool.go')
-rw-r--r--weed/util/limited_async_pool.go44
1 files changed, 44 insertions, 0 deletions
diff --git a/weed/util/limited_async_pool.go b/weed/util/limited_async_pool.go
new file mode 100644
index 000000000..c78de158b
--- /dev/null
+++ b/weed/util/limited_async_pool.go
@@ -0,0 +1,44 @@
+package util
+
+// initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg
+
+import "context"
+
+type Future interface {
+ Await() interface{}
+}
+
+type future struct {
+ await func(ctx context.Context) interface{}
+}
+
+func (f future) Await() interface{} {
+ return f.await(context.Background())
+}
+
+type LimitedAsyncExecutor struct {
+ executor *LimitedConcurrentExecutor
+}
+
+func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor {
+ return &LimitedAsyncExecutor{
+ executor: NewLimitedConcurrentExecutor(limit),
+ }
+}
+
+func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) Future {
+ var result interface{}
+ c := make(chan struct{})
+ ae.executor.Execute(func() {
+ defer close(c)
+ result = job()
+ })
+ return future{await: func(ctx context.Context) interface{} {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c:
+ return result
+ }
+ }}
+}