diff options
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 61 |
1 files changed, 54 insertions, 7 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 30667896a..b5ba657e1 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand/v2" "sort" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -556,6 +557,48 @@ type ecBalancer struct { ecNodes []*EcNode replicaPlacement *super_block.ReplicaPlacement applyBalancing bool + parallelize bool + + wg *sync.WaitGroup + // TODO: Maybe accumulate all errors instead of just the last one. + wgError error +} + +type ecBalancerTask func() error + +func (ecb *ecBalancer) wgInit() { + if ecb.wg == nil { + ecb.wg = &sync.WaitGroup{} + ecb.wgError = nil + } +} + +func (ecb *ecBalancer) wgAdd(f ecBalancerTask) { + if ecb.wg == nil || !ecb.parallelize { + if err := f(); err != nil { + ecb.wgError = err + } + return + } + + ecb.wg.Add(1) + go func() { + if err := f(); err != nil { + ecb.wgError = err + } + ecb.wg.Done() + }() +} + +func (ecb *ecBalancer) wgWait() error { + if ecb.wg != nil { + ecb.wg.Wait() + } + err := ecb.wgError + ecb.wg = nil + ecb.wgError = nil + + return err } func (ecb *ecBalancer) racks() map[RackId]*EcRack { @@ -592,15 +635,15 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error { } func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { - // vid => []ecNode vidLocations := ecb.collectVolumeIdToEcNodes(collection) - // deduplicate ec shards + + ecb.wgInit() for vid, locations := range vidLocations { - if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil { - return err - } + ecb.wgAdd(func() error { + return ecb.doDeduplicateEcShards(collection, vid, locations) + }) } - return nil + return ecb.wgWait() } func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { @@ -636,6 +679,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum return nil } +// TODO: enable parallelization func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := ecb.collectVolumeIdToEcNodes(collection) @@ -741,6 +785,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR return targets[rand.IntN(len(targets))], nil } +// TODO: enable parallelization func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := ecb.collectVolumeIdToEcNodes(collection) @@ -809,6 +854,7 @@ func (ecb *ecBalancer) balanceEcRacks() error { return nil } +// TODO: enable parallelization func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if len(ecRack.ecNodes) <= 1 { return nil @@ -1001,7 +1047,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") } @@ -1020,6 +1066,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic ecNodes: allEcNodes, replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, + parallelize: parallelize, } for _, c := range collections { |
