aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-12 18:14:44 +0100
committerGitHub <noreply@github.com>2024-12-12 09:14:44 -0800
commitb0210df08151e32cb67d5199008870f5fc25827a (patch)
tree28cbfebbe2289ee4059fde4eed16a741c820110c /weed/shell/command_ec_common.go
parent23ffbb083c4bcc9d723ce5857e08f85e7205140a (diff)
downloadseaweedfs-b0210df08151e32cb67d5199008870f5fc25827a.tar.xz
seaweedfs-b0210df08151e32cb67d5199008870f5fc25827a.zip
Begin implementing EC balancing parallelization support. (#6342)
* Begin implementing EC balancing parallelization support. Impacts both `ec.encode` and `ec.balance`, * Nit: improve type naming. * Make the goroutine workgroup handler for `EcBalance()` a bit smarter/error-proof. * Nit: unify naming for `ecBalancer` wait group methods with the rest of the module. * Fix concurrency bug. * Fix whitespace after Gitlab automerge. * Delete stray TODO.
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go61
1 files changed, 54 insertions, 7 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 30667896a..b5ba657e1 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -6,6 +6,7 @@ import (
"fmt"
"math/rand/v2"
"sort"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -556,6 +557,48 @@ type ecBalancer struct {
ecNodes []*EcNode
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
+ parallelize bool
+
+ wg *sync.WaitGroup
+ // TODO: Maybe accumulate all errors instead of just the last one.
+ wgError error
+}
+
+type ecBalancerTask func() error
+
+func (ecb *ecBalancer) wgInit() {
+ if ecb.wg == nil {
+ ecb.wg = &sync.WaitGroup{}
+ ecb.wgError = nil
+ }
+}
+
+func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
+ if ecb.wg == nil || !ecb.parallelize {
+ if err := f(); err != nil {
+ ecb.wgError = err
+ }
+ return
+ }
+
+ ecb.wg.Add(1)
+ go func() {
+ if err := f(); err != nil {
+ ecb.wgError = err
+ }
+ ecb.wg.Done()
+ }()
+}
+
+func (ecb *ecBalancer) wgWait() error {
+ if ecb.wg != nil {
+ ecb.wg.Wait()
+ }
+ err := ecb.wgError
+ ecb.wg = nil
+ ecb.wgError = nil
+
+ return err
}
func (ecb *ecBalancer) racks() map[RackId]*EcRack {
@@ -592,15 +635,15 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
}
func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
- // vid => []ecNode
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
- // deduplicate ec shards
+
+ ecb.wgInit()
for vid, locations := range vidLocations {
- if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil {
- return err
- }
+ ecb.wgAdd(func() error {
+ return ecb.doDeduplicateEcShards(collection, vid, locations)
+ })
}
- return nil
+ return ecb.wgWait()
}
func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
@@ -636,6 +679,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
return nil
}
+// TODO: enable parallelization
func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
@@ -741,6 +785,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR
return targets[rand.IntN(len(targets))], nil
}
+// TODO: enable parallelization
func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
@@ -809,6 +854,7 @@ func (ecb *ecBalancer) balanceEcRacks() error {
return nil
}
+// TODO: enable parallelization
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if len(ecRack.ecNodes) <= 1 {
return nil
@@ -1001,7 +1047,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
}
-func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) {
+func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) {
if len(collections) == 0 {
return fmt.Errorf("no collections to balance")
}
@@ -1020,6 +1066,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
ecNodes: allEcNodes,
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
+ parallelize: parallelize,
}
for _, c := range collections {