diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2025-05-12 23:38:55 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-05-12 14:38:55 -0700 |
| commit | ba1d82db90235fb7ee589bce94cd575e2a16e26e (patch) | |
| tree | 788c5e7f301ca4f51044c838850801d51f15dab6 /weed/shell/common.go | |
| parent | b0f7c33ce0a50fd6debe86264be8e7cbf2a52065 (diff) | |
| download | seaweedfs-ba1d82db90235fb7ee589bce94cd575e2a16e26e.tar.xz seaweedfs-ba1d82db90235fb7ee589bce94cd575e2a16e26e.zip | |
Move `shell.ErrorWaitGroup` into a common file, to cleanly reuse across `weed shell` commands. (#6780)
Move `shell.ErrorWaitGroup` into a dedicated common file, to cleanly reuse across `weed shell` commands.
Diffstat (limited to 'weed/shell/common.go')
| -rw-r--r-- | weed/shell/common.go | 66 |
1 files changed, 66 insertions, 0 deletions
diff --git a/weed/shell/common.go b/weed/shell/common.go new file mode 100644 index 000000000..9fe190b31 --- /dev/null +++ b/weed/shell/common.go @@ -0,0 +1,66 @@ +package shell + +import ( + "errors" + "sync" +) + +// ErrorWaitGroup implements a goroutine wait group which aggregates errors, if any. +type ErrorWaitGroup struct { + maxConcurrency int + wg *sync.WaitGroup + wgSem chan bool + errors []error + errorsMu sync.Mutex +} + +type ErrorWaitGroupTask func() error + +func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup { + if maxConcurrency <= 0 { + // no concurrency = one task at the time + maxConcurrency = 1 + } + return &ErrorWaitGroup{ + maxConcurrency: maxConcurrency, + wg: &sync.WaitGroup{}, + wgSem: make(chan bool, maxConcurrency), + } +} + +// Reset restarts an ErrorWaitGroup, keeping original settings. Errors and pending goroutines, if any, are flushed. +func (ewg *ErrorWaitGroup) Reset() { + close(ewg.wgSem) + + ewg.wg = &sync.WaitGroup{} + ewg.wgSem = make(chan bool, ewg.maxConcurrency) + ewg.errors = nil +} + +// Add queues an ErrorWaitGroupTask to be executed as a goroutine. +func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { + if ewg.maxConcurrency <= 1 { + // keep run order deterministic when parallelization is off + ewg.errors = append(ewg.errors, f()) + return + } + + ewg.wg.Add(1) + go func() { + ewg.wgSem <- true + + err := f() + ewg.errorsMu.Lock() + ewg.errors = append(ewg.errors, err) + ewg.errorsMu.Unlock() + + <-ewg.wgSem + ewg.wg.Done() + }() +} + +// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them. +func (ewg *ErrorWaitGroup) Wait() error { + ewg.wg.Wait() + return errors.Join(ewg.errors...) +} |
