aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-13 14:33:53 +0100
committerGitHub <noreply@github.com>2024-12-13 05:33:53 -0800
commitb81def5e5c3e936098aba07f203796890f2acf69 (patch)
treea221ddbae5db9251d6b0b075a29d612390a61aab /weed/shell/command_ec_common.go
parentd6f3e1970d91da384d842f0811288391dcd78bb7 (diff)
downloadseaweedfs-b81def5e5c3e936098aba07f203796890f2acf69.tar.xz
seaweedfs-b81def5e5c3e936098aba07f203796890f2acf69.zip
Parallelize EC balancing for racks. (#6351)
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go30
1 files changed, 15 insertions, 15 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index b5ba657e1..c38518664 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -559,24 +559,24 @@ type ecBalancer struct {
applyBalancing bool
parallelize bool
- wg *sync.WaitGroup
- // TODO: Maybe accumulate all errors instead of just the last one.
- wgError error
+ wg *sync.WaitGroup
+ wgErrors []error
}
type ecBalancerTask func() error
func (ecb *ecBalancer) wgInit() {
- if ecb.wg == nil {
- ecb.wg = &sync.WaitGroup{}
- ecb.wgError = nil
+ if ecb.wg != nil {
+ return
}
+ ecb.wg = &sync.WaitGroup{}
+ ecb.wgErrors = nil
}
func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
if ecb.wg == nil || !ecb.parallelize {
if err := f(); err != nil {
- ecb.wgError = err
+ ecb.wgErrors = append(ecb.wgErrors, err)
}
return
}
@@ -584,7 +584,7 @@ func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
ecb.wg.Add(1)
go func() {
if err := f(); err != nil {
- ecb.wgError = err
+ ecb.wgErrors = append(ecb.wgErrors, err)
}
ecb.wg.Done()
}()
@@ -594,9 +594,9 @@ func (ecb *ecBalancer) wgWait() error {
if ecb.wg != nil {
ecb.wg.Wait()
}
- err := ecb.wgError
+ err := errors.Join(ecb.wgErrors...)
ecb.wg = nil
- ecb.wgError = nil
+ ecb.wgErrors = nil
return err
}
@@ -846,15 +846,15 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int
func (ecb *ecBalancer) balanceEcRacks() error {
// balance one rack for all ec shards
+ ecb.wgInit()
for _, ecRack := range ecb.racks() {
- if err := ecb.doBalanceEcRack(ecRack); err != nil {
- return err
- }
+ ecb.wgAdd(func() error {
+ return ecb.doBalanceEcRack(ecRack)
+ })
}
- return nil
+ return ecb.wgWait()
}
-// TODO: enable parallelization
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if len(ecRack.ecNodes) <= 1 {
return nil