diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2024-12-13 14:33:53 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-12-13 05:33:53 -0800 |
| commit | b81def5e5c3e936098aba07f203796890f2acf69 (patch) | |
| tree | a221ddbae5db9251d6b0b075a29d612390a61aab /weed/shell/command_ec_common.go | |
| parent | d6f3e1970d91da384d842f0811288391dcd78bb7 (diff) | |
| download | seaweedfs-b81def5e5c3e936098aba07f203796890f2acf69.tar.xz seaweedfs-b81def5e5c3e936098aba07f203796890f2acf69.zip | |
Parallelize EC balancing for racks. (#6351)
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index b5ba657e1..c38518664 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -559,24 +559,24 @@ type ecBalancer struct { applyBalancing bool parallelize bool - wg *sync.WaitGroup - // TODO: Maybe accumulate all errors instead of just the last one. - wgError error + wg *sync.WaitGroup + wgErrors []error } type ecBalancerTask func() error func (ecb *ecBalancer) wgInit() { - if ecb.wg == nil { - ecb.wg = &sync.WaitGroup{} - ecb.wgError = nil + if ecb.wg != nil { + return } + ecb.wg = &sync.WaitGroup{} + ecb.wgErrors = nil } func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { if ecb.wg == nil || !ecb.parallelize { if err := f(); err != nil { - ecb.wgError = err + ecb.wgErrors = append(ecb.wgErrors, err) } return } @@ -584,7 +584,7 @@ func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { ecb.wg.Add(1) go func() { if err := f(); err != nil { - ecb.wgError = err + ecb.wgErrors = append(ecb.wgErrors, err) } ecb.wg.Done() }() @@ -594,9 +594,9 @@ func (ecb *ecBalancer) wgWait() error { if ecb.wg != nil { ecb.wg.Wait() } - err := ecb.wgError + err := errors.Join(ecb.wgErrors...) ecb.wg = nil - ecb.wgError = nil + ecb.wgErrors = nil return err } @@ -846,15 +846,15 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int func (ecb *ecBalancer) balanceEcRacks() error { // balance one rack for all ec shards + ecb.wgInit() for _, ecRack := range ecb.racks() { - if err := ecb.doBalanceEcRack(ecRack); err != nil { - return err - } + ecb.wgAdd(func() error { + return ecb.doBalanceEcRack(ecRack) + }) } - return nil + return ecb.wgWait() } -// TODO: enable parallelization func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if len(ecRack.ecNodes) <= 1 { return nil |
