aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-18 22:26:26 +0100
committerGitHub <noreply@github.com>2024-12-18 13:26:26 -0800
commitba0707af641e41ba3cbed2b533ed2432d21295ba (patch)
tree967d95ec034ea3d3adbcea86662be3ab043a40d7 /weed/shell/command_ec_common.go
parent44c48c929ac2e513a3ad5749744c77ab480ae1fe (diff)
downloadseaweedfs-ba0707af641e41ba3cbed2b533ed2432d21295ba.tar.xz
seaweedfs-ba0707af641e41ba3cbed2b533ed2432d21295ba.zip
Allow configuring the maximum number of concurrent tasks for EC parallelization. (#6376)
Follow-up to b0210df0.
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go93
1 files changed, 48 insertions, 45 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index a09e2ad62..e4629905e 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -114,47 +114,50 @@ var (
)
type ErrorWaitGroup struct {
- parallelize bool
- wg *sync.WaitGroup
- errors []error
- errorsMu sync.Mutex
+ maxConcurrency int
+ wg *sync.WaitGroup
+ wgSem chan bool
+ errors []error
+ errorsMu sync.Mutex
}
type ErrorWaitGroupTask func() error
-func (ewg *ErrorWaitGroup) Init() {
- if ewg.wg != nil {
- return
+func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
+ if maxConcurrency <= 0 {
+ // No concurrency = one task at the time
+ maxConcurrency = 1
+ }
+ return &ErrorWaitGroup{
+ maxConcurrency: maxConcurrency,
+ wg: &sync.WaitGroup{},
+ wgSem: make(chan bool, maxConcurrency),
}
- ewg.wg = &sync.WaitGroup{}
- ewg.errors = nil
}
func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
- if ewg.wg == nil || !ewg.parallelize {
+ if ewg.maxConcurrency <= 1 {
+ // Keep run order deterministic when parallelization is off
ewg.errors = append(ewg.errors, f())
return
}
ewg.wg.Add(1)
go func() {
+ ewg.wgSem <- true
+
err := f()
ewg.errorsMu.Lock()
ewg.errors = append(ewg.errors, err)
ewg.errorsMu.Unlock()
+
+ <-ewg.wgSem
ewg.wg.Done()
}()
}
func (ewg *ErrorWaitGroup) Wait() error {
- if ewg.wg != nil {
- ewg.wg.Wait()
- }
-
- err := errors.Join(ewg.errors...)
- ewg.wg = nil
- ewg.errors = nil
-
- return err
+ ewg.wg.Wait()
+ return errors.Join(ewg.errors...)
}
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
@@ -597,12 +600,15 @@ func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string]
}
type ecBalancer struct {
- commandEnv *CommandEnv
- ecNodes []*EcNode
- replicaPlacement *super_block.ReplicaPlacement
- applyBalancing bool
+ commandEnv *CommandEnv
+ ecNodes []*EcNode
+ replicaPlacement *super_block.ReplicaPlacement
+ applyBalancing bool
+ maxParallelization int
+}
- ewg ErrorWaitGroup
+func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
+ return NewErrorWaitGroup(ecb.maxParallelization)
}
func (ecb *ecBalancer) racks() map[RackId]*EcRack {
@@ -641,13 +647,13 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
- ecb.ewg.Init()
+ ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations {
- ecb.ewg.Add(func() error {
+ ewg.Add(func() error {
return ecb.doDeduplicateEcShards(collection, vid, locations)
})
}
- return ecb.ewg.Wait()
+ return ewg.Wait()
}
func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
@@ -688,13 +694,13 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
// spread the ec shards evenly
- ecb.ewg.Init()
+ ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations {
- ecb.ewg.Add(func() error {
+ ewg.Add(func() error {
return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations)
})
}
- return ecb.ewg.Wait()
+ return ewg.Wait()
}
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
@@ -796,7 +802,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
racks := ecb.racks()
// spread the ec shards evenly
- ecb.ewg.Init()
+ ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
@@ -815,12 +821,12 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
- ecb.ewg.Add(func() error {
+ ewg.Add(func() error {
return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes)
})
}
}
- return ecb.ewg.Wait()
+ return ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
@@ -851,13 +857,13 @@ func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int
func (ecb *ecBalancer) balanceEcRacks() error {
// balance one rack for all ec shards
- ecb.ewg.Init()
+ ewg := ecb.errorWaitGroup()
for _, ecRack := range ecb.racks() {
- ecb.ewg.Add(func() error {
+ ewg.Add(func() error {
return ecb.doBalanceEcRack(ecRack)
})
}
- return ecb.ewg.Wait()
+ return ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
@@ -1052,7 +1058,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
}
-func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, parallelize bool, applyBalancing bool) (err error) {
+func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
if len(collections) == 0 {
return fmt.Errorf("no collections to balance")
}
@@ -1067,14 +1073,11 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
}
ecb := &ecBalancer{
- commandEnv: commandEnv,
- ecNodes: allEcNodes,
- replicaPlacement: ecReplicaPlacement,
- applyBalancing: applyBalancing,
-
- ewg: ErrorWaitGroup{
- parallelize: parallelize,
- },
+ commandEnv: commandEnv,
+ ecNodes: allEcNodes,
+ replicaPlacement: ecReplicaPlacement,
+ applyBalancing: applyBalancing,
+ maxParallelization: maxParallelization,
}
for _, c := range collections {