aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-17 18:39:51 +0100
committerGitHub <noreply@github.com>2024-12-17 09:39:51 -0800
commit9fbc4ea417d7af358270a53fdd4f7f37a37d82d0 (patch)
treeea6c08caa323e27b65eb23d19eee9d7b6f01db5b
parente77e50886e937cd63175878ece20e0e0dbfc81ff (diff)
downloadseaweedfs-9fbc4ea417d7af358270a53fdd4f7f37a37d82d0.tar.xz
seaweedfs-9fbc4ea417d7af358270a53fdd4f7f37a37d82d0.zip
Rework `shell.EcBalance()`'s waitgroup code into a standalone type. (#6373)
Rework `shell.EcBalance()`'s waitgroup with errors code into a standalone type. We'll re-use this for other EC jobs - for example, volume creation. Also fixes potential concurrency issues when collecting error results.
-rw-r--r--weed/shell/command_ec_common.go115
1 files changed, 61 insertions, 54 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index c73e342db..a09e2ad62 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -113,6 +113,50 @@ var (
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
+type ErrorWaitGroup struct {
+ parallelize bool
+ wg *sync.WaitGroup
+ errors []error
+ errorsMu sync.Mutex
+}
+type ErrorWaitGroupTask func() error
+
+func (ewg *ErrorWaitGroup) Init() {
+ if ewg.wg != nil {
+ return
+ }
+ ewg.wg = &sync.WaitGroup{}
+ ewg.errors = nil
+}
+
+func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
+ if ewg.wg == nil || !ewg.parallelize {
+ ewg.errors = append(ewg.errors, f())
+ return
+ }
+
+ ewg.wg.Add(1)
+ go func() {
+ err := f()
+ ewg.errorsMu.Lock()
+ ewg.errors = append(ewg.errors, err)
+ ewg.errorsMu.Unlock()
+ ewg.wg.Done()
+ }()
+}
+
+func (ewg *ErrorWaitGroup) Wait() error {
+ if ewg.wg != nil {
+ ewg.wg.Wait()
+ }
+
+ err := errors.Join(ewg.errors...)
+ ewg.wg = nil
+ ewg.errors = nil
+
+ return err
+}
+
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
var resp *master_pb.GetMasterConfigurationResponse
var err error
@@ -557,48 +601,8 @@ type ecBalancer struct {
ecNodes []*EcNode
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
- parallelize bool
- wg *sync.WaitGroup
- wgErrors []error
-}
-
-type ecBalancerTask func() error
-
-func (ecb *ecBalancer) wgInit() {
- if ecb.wg != nil {
- return
- }
- ecb.wg = &sync.WaitGroup{}
- ecb.wgErrors = nil
-}
-
-func (ecb *ecBalancer) wgAdd(f ecBalancerTask) {
- if ecb.wg == nil || !ecb.parallelize {
- if err := f(); err != nil {
- ecb.wgErrors = append(ecb.wgErrors, err)
- }
- return
- }
-
- ecb.wg.Add(1)
- go func() {
- if err := f(); err != nil {
- ecb.wgErrors = append(ecb.wgErrors, err)
- }
- ecb.wg.Done()
- }()
-}
-
-func (ecb *ecBalancer) wgWait() error {
- if ecb.wg != nil {
- ecb.wg.Wait()
- }
- err := errors.Join(ecb.wgErrors...)
- ecb.wg = nil
- ecb.wgErrors = nil
-
- return err
+ ewg ErrorWaitGroup
}
func (ecb *ecBalancer) racks() map[RackId]*EcRack {
@@ -637,13 +641,13 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
- ecb.wgInit()
+ ecb.ewg.Init()
for vid, locations := range vidLocations {
- ecb.wgAdd(func() error {
+ ecb.ewg.Add(func() error {
return ecb.doDeduplicateEcShards(collection, vid, locations)
})
}
- return ecb.wgWait()
+ return ecb.ewg.Wait()
}
func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
@@ -684,13 +688,13 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
// spread the ec shards evenly
- ecb.wgInit()
+ ecb.ewg.Init()
for vid, locations := range vidLocations {
- ecb.wgAdd(func() error {
+ ecb.ewg.Add(func() error {
return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations)
})
}
- return ecb.wgWait()
+ return ecb.ewg.Wait()
}
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
@@ -792,7 +796,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
racks := ecb.racks()
// spread the ec shards evenly
- ecb.wgInit()
+ ecb.ewg.Init()
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
@@ -811,12 +815,12 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
- ecb.wgAdd(func() error {
+ ecb.ewg.Add(func() error {
return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes)
})
}
}
- return ecb.wgWait()
+ return ecb.ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
@@ -847,13 +851,13 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int
func (ecb *ecBalancer) balanceEcRacks() error {
// balance one rack for all ec shards
- ecb.wgInit()
+ ecb.ewg.Init()
for _, ecRack := range ecb.racks() {
- ecb.wgAdd(func() error {
+ ecb.ewg.Add(func() error {
return ecb.doBalanceEcRack(ecRack)
})
}
- return ecb.wgWait()
+ return ecb.ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
@@ -1067,7 +1071,10 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
ecNodes: allEcNodes,
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
- parallelize: parallelize,
+
+ ewg: ErrorWaitGroup{
+ parallelize: parallelize,
+ },
}
for _, c := range collections {