aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_rebuild.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_rebuild.go')
-rw-r--r--weed/shell/command_ec_rebuild.go143
1 files changed, 109 insertions, 34 deletions
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 3b2c63cc7..79acebff1 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
+ "sync"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -18,12 +19,14 @@ func init() {
}
type ecRebuilder struct {
- // TODO: add ErrorWaitGroup for parallelization
commandEnv *CommandEnv
ecNodes []*EcNode
writer io.Writer
applyChanges bool
collections []string
+
+ ewg *ErrorWaitGroup
+ ecNodesMu sync.Mutex
}
type commandEcRebuild struct {
@@ -36,7 +39,14 @@ func (c *commandEcRebuild) Name() string {
func (c *commandEcRebuild) Help() string {
return `find and rebuild missing ec shards among volume servers
- ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply]
+ ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N]
+
+ Options:
+ -collection: specify a collection name, or "EACH_COLLECTION" to process all collections
+ -apply: actually perform the rebuild operations (default is dry-run mode)
+ -maxParallelization: number of volumes to rebuild concurrently (default: 10)
+ Increase for faster rebuilds with more system resources.
+ Decrease if experiencing resource contention or instability.
Algorithm:
@@ -71,13 +81,13 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
+ maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyChanges := fixCommand.Bool("apply", false, "apply the changes")
// TODO: remove this alias
applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)")
if err = fixCommand.Parse(args); err != nil {
return nil
}
-
handleDeprecatedForceFlag(writer, fixCommand, applyChangesAlias, applyChanges)
infoAboutSimulationMode(writer, *applyChanges, "-apply")
@@ -107,17 +117,16 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
writer: writer,
applyChanges: *applyChanges,
collections: collections,
+
+ ewg: NewErrorWaitGroup(*maxParallelization),
}
fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections))
for _, c := range collections {
- fmt.Printf("rebuildEcVolumes collection %s\n", c)
- if err = erb.rebuildEcVolumes(c); err != nil {
- return err
- }
+ erb.rebuildEcVolumes(c)
}
- return nil
+ return erb.ewg.Wait()
}
func (erb *ecRebuilder) write(format string, a ...any) {
@@ -128,30 +137,87 @@ func (erb *ecRebuilder) isLocked() bool {
return erb.commandEnv.isLocked()
}
-// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder.
-func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode {
+// countLocalShards returns the number of shards already present locally on the node for the given volume.
+func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volumeId needle.VolumeId) int {
+ for _, diskInfo := range node.info.DiskInfos {
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId {
+ shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ return len(shardBits.ShardIds())
+ }
+ }
+ }
+ return 0
+}
+
+// selectAndReserveRebuilder atomically selects a rebuilder node with sufficient free slots
+// and reserves slots only for the non-local shards that need to be copied/generated.
+func (erb *ecRebuilder) selectAndReserveRebuilder(collection string, volumeId needle.VolumeId) (*EcNode, int, error) {
+ erb.ecNodesMu.Lock()
+ defer erb.ecNodesMu.Unlock()
+
if len(erb.ecNodes) == 0 {
- return nil
+ return nil, 0, fmt.Errorf("no ec nodes available")
}
- res := erb.ecNodes[0]
- for i := 1; i < len(erb.ecNodes); i++ {
- if erb.ecNodes[i].freeEcSlot > res.freeEcSlot {
- res = erb.ecNodes[i]
+ // Find the node with the most free slots, considering local shards
+ var bestNode *EcNode
+ var bestSlotsNeeded int
+ var maxAvailableSlots int
+ var minSlotsNeeded int = erasure_coding.TotalShardsCount // Start with maximum possible
+ for _, node := range erb.ecNodes {
+ localShards := erb.countLocalShards(node, collection, volumeId)
+ slotsNeeded := erasure_coding.TotalShardsCount - localShards
+ if slotsNeeded < 0 {
+ slotsNeeded = 0
+ }
+
+ if node.freeEcSlot > maxAvailableSlots {
+ maxAvailableSlots = node.freeEcSlot
+ }
+
+ if slotsNeeded < minSlotsNeeded {
+ minSlotsNeeded = slotsNeeded
+ }
+
+ if node.freeEcSlot >= slotsNeeded {
+ if bestNode == nil || node.freeEcSlot > bestNode.freeEcSlot {
+ bestNode = node
+ bestSlotsNeeded = slotsNeeded
+ }
}
}
- return res
+ if bestNode == nil {
+ return nil, 0, fmt.Errorf("no node has sufficient free slots for volume %d (need at least %d slots, max available: %d)",
+ volumeId, minSlotsNeeded, maxAvailableSlots)
+ }
+
+ // Reserve slots only for non-local shards
+ bestNode.freeEcSlot -= bestSlotsNeeded
+
+ return bestNode, bestSlotsNeeded, nil
}
-func (erb *ecRebuilder) rebuildEcVolumes(collection string) error {
- fmt.Printf("rebuildEcVolumes %s\n", collection)
+// releaseRebuilder releases the reserved slots back to the rebuilder node.
+func (erb *ecRebuilder) releaseRebuilder(node *EcNode, slotsToRelease int) {
+ erb.ecNodesMu.Lock()
+ defer erb.ecNodesMu.Unlock()
+
+ // Release slots by incrementing the free slot count
+ node.freeEcSlot += slotsToRelease
+}
+
+func (erb *ecRebuilder) rebuildEcVolumes(collection string) {
+ fmt.Printf("rebuildEcVolumes for %q\n", collection)
// collect vid => each shard locations, similar to ecShardMap in topology.go
ecShardMap := make(EcShardMap)
+ erb.ecNodesMu.Lock()
for _, ecNode := range erb.ecNodes {
ecShardMap.registerEcNode(ecNode, collection)
}
+ erb.ecNodesMu.Unlock()
for vid, locations := range ecShardMap {
shardCount := locations.shardCount()
@@ -159,31 +225,37 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) error {
continue
}
if shardCount < erasure_coding.DataShardsCount {
- return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount)
+ // Capture variables for closure
+ vid := vid
+ shardCount := shardCount
+ erb.ewg.Add(func() error {
+ return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount)
+ })
+ continue
}
- if err := erb.rebuildOneEcVolume(collection, vid, locations); err != nil {
- return err
- }
- }
+ // Capture variables for closure
+ vid := vid
+ locations := locations
- return nil
+ erb.ewg.Add(func() error {
+ // Select rebuilder and reserve slots atomically per volume
+ rebuilder, slotsToReserve, err := erb.selectAndReserveRebuilder(collection, vid)
+ if err != nil {
+ return fmt.Errorf("failed to select rebuilder for volume %d: %v", vid, err)
+ }
+ defer erb.releaseRebuilder(rebuilder, slotsToReserve)
+
+ return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder)
+ })
+ }
}
-func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations) error {
+func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations, rebuilder *EcNode) error {
if !erb.isLocked() {
return fmt.Errorf("lock is lost")
}
- // TODO: fix this logic so it supports concurrent executions
- rebuilder := erb.ecNodeWithMoreFreeSlots()
- if rebuilder == nil {
- return fmt.Errorf("no EC nodes available for rebuild")
- }
- if rebuilder.freeEcSlot < erasure_coding.TotalShardsCount {
- return fmt.Errorf("disk space is not enough")
- }
-
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
// collect shard files to rebuilder local disk
@@ -219,6 +291,9 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
return err
}
+ // ensure ECNode updates are atomic
+ erb.ecNodesMu.Lock()
+ defer erb.ecNodesMu.Unlock()
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds)
return nil