aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/common.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2025-05-12 23:38:55 +0200
committerGitHub <noreply@github.com>2025-05-12 14:38:55 -0700
commitba1d82db90235fb7ee589bce94cd575e2a16e26e (patch)
tree788c5e7f301ca4f51044c838850801d51f15dab6 /weed/shell/common.go
parentb0f7c33ce0a50fd6debe86264be8e7cbf2a52065 (diff)
downloadseaweedfs-ba1d82db90235fb7ee589bce94cd575e2a16e26e.tar.xz
seaweedfs-ba1d82db90235fb7ee589bce94cd575e2a16e26e.zip
Move `shell.ErrorWaitGroup` into a common file, to cleanly reuse across `weed shell` commands. (#6780)
Move `shell.ErrorWaitGroup` into a dedicated common file, to cleanly reuse across `weed shell` commands.
Diffstat (limited to 'weed/shell/common.go')
-rw-r--r--weed/shell/common.go66
1 files changed, 66 insertions, 0 deletions
diff --git a/weed/shell/common.go b/weed/shell/common.go
new file mode 100644
index 000000000..9fe190b31
--- /dev/null
+++ b/weed/shell/common.go
@@ -0,0 +1,66 @@
+package shell
+
+import (
+ "errors"
+ "sync"
+)
+
+// ErrorWaitGroup implements a goroutine wait group which aggregates errors, if any.
+type ErrorWaitGroup struct {
+ maxConcurrency int
+ wg *sync.WaitGroup
+ wgSem chan bool
+ errors []error
+ errorsMu sync.Mutex
+}
+
+type ErrorWaitGroupTask func() error
+
+func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
+ if maxConcurrency <= 0 {
+ // no concurrency = one task at the time
+ maxConcurrency = 1
+ }
+ return &ErrorWaitGroup{
+ maxConcurrency: maxConcurrency,
+ wg: &sync.WaitGroup{},
+ wgSem: make(chan bool, maxConcurrency),
+ }
+}
+
+// Reset restarts an ErrorWaitGroup, keeping original settings. Errors and pending goroutines, if any, are flushed.
+func (ewg *ErrorWaitGroup) Reset() {
+ close(ewg.wgSem)
+
+ ewg.wg = &sync.WaitGroup{}
+ ewg.wgSem = make(chan bool, ewg.maxConcurrency)
+ ewg.errors = nil
+}
+
+// Add queues an ErrorWaitGroupTask to be executed as a goroutine.
+func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
+ if ewg.maxConcurrency <= 1 {
+ // keep run order deterministic when parallelization is off
+ ewg.errors = append(ewg.errors, f())
+ return
+ }
+
+ ewg.wg.Add(1)
+ go func() {
+ ewg.wgSem <- true
+
+ err := f()
+ ewg.errorsMu.Lock()
+ ewg.errors = append(ewg.errors, err)
+ ewg.errorsMu.Unlock()
+
+ <-ewg.wgSem
+ ewg.wg.Done()
+ }()
+}
+
+// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them.
+func (ewg *ErrorWaitGroup) Wait() error {
+ ewg.wg.Wait()
+ return errors.Join(ewg.errors...)
+}