diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2024-12-12 18:14:44 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-12-12 09:14:44 -0800 |
| commit | b0210df08151e32cb67d5199008870f5fc25827a (patch) | |
| tree | 28cbfebbe2289ee4059fde4eed16a741c820110c /weed/shell/command_ec_common.go | |
| parent | 23ffbb083c4bcc9d723ce5857e08f85e7205140a (diff) | |
| download | seaweedfs-b0210df08151e32cb67d5199008870f5fc25827a.tar.xz seaweedfs-b0210df08151e32cb67d5199008870f5fc25827a.zip | |
Begin implementing EC balancing parallelization support. (#6342)
* Begin implementing EC balancing parallelization support.
Impacts both `ec.encode` and `ec.balance`,
* Nit: improve type naming.
* Make the goroutine workgroup handler for `EcBalance()` a bit smarter/error-proof.
* Nit: unify naming for `ecBalancer` wait group methods with the rest of the module.
* Fix concurrency bug.
* Fix whitespace after Gitlab automerge.
* Delete stray TODO.
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 61 |
1 files changed, 54 insertions, 7 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 30667896a..b5ba657e1 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand/v2" "sort" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -556,6 +557,48 @@ type ecBalancer struct { ecNodes []*EcNode replicaPlacement *super_block.ReplicaPlacement applyBalancing bool + parallelize bool + + wg *sync.WaitGroup + // TODO: Maybe accumulate all errors instead of just the last one. + wgError error +} + +type ecBalancerTask func() error + +func (ecb *ecBalancer) wgInit() { + if ecb.wg == nil { + ecb.wg = &sync.WaitGroup{} + ecb.wgError = nil + } +} + +func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { + if ecb.wg == nil || !ecb.parallelize { + if err := f(); err != nil { + ecb.wgError = err + } + return + } + + ecb.wg.Add(1) + go func() { + if err := f(); err != nil { + ecb.wgError = err + } + ecb.wg.Done() + }() +} + +func (ecb *ecBalancer) wgWait() error { + if ecb.wg != nil { + ecb.wg.Wait() + } + err := ecb.wgError + ecb.wg = nil + ecb.wgError = nil + + return err } func (ecb *ecBalancer) racks() map[RackId]*EcRack { @@ -592,15 +635,15 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error { } func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { - // vid => []ecNode vidLocations := ecb.collectVolumeIdToEcNodes(collection) - // deduplicate ec shards + + ecb.wgInit() for vid, locations := range vidLocations { - if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil { - return err - } + ecb.wgAdd(func() error { + return ecb.doDeduplicateEcShards(collection, vid, locations) + }) } - return nil + return ecb.wgWait() } func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { @@ -636,6 +679,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum return nil } +// TODO: enable parallelization func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := ecb.collectVolumeIdToEcNodes(collection) @@ -741,6 +785,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR return targets[rand.IntN(len(targets))], nil } +// TODO: enable parallelization func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := ecb.collectVolumeIdToEcNodes(collection) @@ -809,6 +854,7 @@ func (ecb *ecBalancer) balanceEcRacks() error { return nil } +// TODO: enable parallelization func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if len(ecRack.ecNodes) <= 1 { return nil @@ -1001,7 +1047,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") } @@ -1020,6 +1066,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic ecNodes: allEcNodes, replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, + parallelize: parallelize, } for _, c := range collections { |
