aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2025-05-12 23:38:55 +0200
committerGitHub <noreply@github.com>2025-05-12 14:38:55 -0700
commitba1d82db90235fb7ee589bce94cd575e2a16e26e (patch)
tree788c5e7f301ca4f51044c838850801d51f15dab6
parentb0f7c33ce0a50fd6debe86264be8e7cbf2a52065 (diff)
downloadseaweedfs-ba1d82db90235fb7ee589bce94cd575e2a16e26e.tar.xz
seaweedfs-ba1d82db90235fb7ee589bce94cd575e2a16e26e.zip
Move `shell.ErrorWaitGroup` into a common file, to cleanly reuse across `weed shell` commands. (#6780)
Move `shell.ErrorWaitGroup` into a dedicated common file, to cleanly reuse across `weed shell` commands.
-rw-r--r--weed/shell/command_ec_common.go56
-rw-r--r--weed/shell/common.go66
2 files changed, 66 insertions, 56 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 209d8a733..a6f27232e 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -7,7 +7,6 @@ import (
"math/rand/v2"
"slices"
"sort"
- "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -113,61 +112,6 @@ var (
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
-type ErrorWaitGroup struct {
- maxConcurrency int
- wg *sync.WaitGroup
- wgSem chan bool
- errors []error
- errorsMu sync.Mutex
-}
-type ErrorWaitGroupTask func() error
-
-func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
- if maxConcurrency <= 0 {
- // No concurrency = one task at the time
- maxConcurrency = 1
- }
- return &ErrorWaitGroup{
- maxConcurrency: maxConcurrency,
- wg: &sync.WaitGroup{},
- wgSem: make(chan bool, maxConcurrency),
- }
-}
-
-func (ewg *ErrorWaitGroup) Reset() {
- close(ewg.wgSem)
-
- ewg.wg = &sync.WaitGroup{}
- ewg.wgSem = make(chan bool, ewg.maxConcurrency)
- ewg.errors = nil
-}
-
-func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
- if ewg.maxConcurrency <= 1 {
- // Keep run order deterministic when parallelization is off
- ewg.errors = append(ewg.errors, f())
- return
- }
-
- ewg.wg.Add(1)
- go func() {
- ewg.wgSem <- true
-
- err := f()
- ewg.errorsMu.Lock()
- ewg.errors = append(ewg.errors, err)
- ewg.errorsMu.Unlock()
-
- <-ewg.wgSem
- ewg.wg.Done()
- }()
-}
-
-func (ewg *ErrorWaitGroup) Wait() error {
- ewg.wg.Wait()
- return errors.Join(ewg.errors...)
-}
-
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
var resp *master_pb.GetMasterConfigurationResponse
var err error
diff --git a/weed/shell/common.go b/weed/shell/common.go
new file mode 100644
index 000000000..9fe190b31
--- /dev/null
+++ b/weed/shell/common.go
@@ -0,0 +1,66 @@
+package shell
+
+import (
+ "errors"
+ "sync"
+)
+
+// ErrorWaitGroup implements a goroutine wait group which aggregates errors, if any.
+type ErrorWaitGroup struct {
+ maxConcurrency int
+ wg *sync.WaitGroup
+ wgSem chan bool
+ errors []error
+ errorsMu sync.Mutex
+}
+
+type ErrorWaitGroupTask func() error
+
+func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
+ if maxConcurrency <= 0 {
+ // no concurrency = one task at the time
+ maxConcurrency = 1
+ }
+ return &ErrorWaitGroup{
+ maxConcurrency: maxConcurrency,
+ wg: &sync.WaitGroup{},
+ wgSem: make(chan bool, maxConcurrency),
+ }
+}
+
+// Reset restarts an ErrorWaitGroup, keeping original settings. Errors and pending goroutines, if any, are flushed.
+func (ewg *ErrorWaitGroup) Reset() {
+ close(ewg.wgSem)
+
+ ewg.wg = &sync.WaitGroup{}
+ ewg.wgSem = make(chan bool, ewg.maxConcurrency)
+ ewg.errors = nil
+}
+
+// Add queues an ErrorWaitGroupTask to be executed as a goroutine.
+func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
+ if ewg.maxConcurrency <= 1 {
+ // keep run order deterministic when parallelization is off
+ ewg.errors = append(ewg.errors, f())
+ return
+ }
+
+ ewg.wg.Add(1)
+ go func() {
+ ewg.wgSem <- true
+
+ err := f()
+ ewg.errorsMu.Lock()
+ ewg.errors = append(ewg.errors, err)
+ ewg.errorsMu.Unlock()
+
+ <-ewg.wgSem
+ ewg.wg.Done()
+ }()
+}
+
+// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them.
+func (ewg *ErrorWaitGroup) Wait() error {
+ ewg.wg.Wait()
+ return errors.Join(ewg.errors...)
+}