diff options
Diffstat (limited to 'weed/shell/command_ec_rebuild.go')
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 143 |
1 files changed, 109 insertions, 34 deletions
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 3b2c63cc7..79acebff1 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "sync" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -18,12 +19,14 @@ func init() { } type ecRebuilder struct { - // TODO: add ErrorWaitGroup for parallelization commandEnv *CommandEnv ecNodes []*EcNode writer io.Writer applyChanges bool collections []string + + ewg *ErrorWaitGroup + ecNodesMu sync.Mutex } type commandEcRebuild struct { @@ -36,7 +39,14 @@ func (c *commandEcRebuild) Name() string { func (c *commandEcRebuild) Help() string { return `find and rebuild missing ec shards among volume servers - ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] + ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N] + + Options: + -collection: specify a collection name, or "EACH_COLLECTION" to process all collections + -apply: actually perform the rebuild operations (default is dry-run mode) + -maxParallelization: number of volumes to rebuild concurrently (default: 10) + Increase for faster rebuilds with more system resources. + Decrease if experiencing resource contention or instability. Algorithm: @@ -71,13 +81,13 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") applyChanges := fixCommand.Bool("apply", false, "apply the changes") // TODO: remove this alias applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)") if err = fixCommand.Parse(args); err != nil { return nil } - handleDeprecatedForceFlag(writer, fixCommand, applyChangesAlias, applyChanges) infoAboutSimulationMode(writer, *applyChanges, "-apply") @@ -107,17 +117,16 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W writer: writer, applyChanges: *applyChanges, collections: collections, + + ewg: NewErrorWaitGroup(*maxParallelization), } fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections)) for _, c := range collections { - fmt.Printf("rebuildEcVolumes collection %s\n", c) - if err = erb.rebuildEcVolumes(c); err != nil { - return err - } + erb.rebuildEcVolumes(c) } - return nil + return erb.ewg.Wait() } func (erb *ecRebuilder) write(format string, a ...any) { @@ -128,30 +137,87 @@ func (erb *ecRebuilder) isLocked() bool { return erb.commandEnv.isLocked() } -// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder. -func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode { +// countLocalShards returns the number of shards already present locally on the node for the given volume. +func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volumeId needle.VolumeId) int { + for _, diskInfo := range node.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + return len(shardBits.ShardIds()) + } + } + } + return 0 +} + +// selectAndReserveRebuilder atomically selects a rebuilder node with sufficient free slots +// and reserves slots only for the non-local shards that need to be copied/generated. +func (erb *ecRebuilder) selectAndReserveRebuilder(collection string, volumeId needle.VolumeId) (*EcNode, int, error) { + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() + if len(erb.ecNodes) == 0 { - return nil + return nil, 0, fmt.Errorf("no ec nodes available") } - res := erb.ecNodes[0] - for i := 1; i < len(erb.ecNodes); i++ { - if erb.ecNodes[i].freeEcSlot > res.freeEcSlot { - res = erb.ecNodes[i] + // Find the node with the most free slots, considering local shards + var bestNode *EcNode + var bestSlotsNeeded int + var maxAvailableSlots int + var minSlotsNeeded int = erasure_coding.TotalShardsCount // Start with maximum possible + for _, node := range erb.ecNodes { + localShards := erb.countLocalShards(node, collection, volumeId) + slotsNeeded := erasure_coding.TotalShardsCount - localShards + if slotsNeeded < 0 { + slotsNeeded = 0 + } + + if node.freeEcSlot > maxAvailableSlots { + maxAvailableSlots = node.freeEcSlot + } + + if slotsNeeded < minSlotsNeeded { + minSlotsNeeded = slotsNeeded + } + + if node.freeEcSlot >= slotsNeeded { + if bestNode == nil || node.freeEcSlot > bestNode.freeEcSlot { + bestNode = node + bestSlotsNeeded = slotsNeeded + } } } - return res + if bestNode == nil { + return nil, 0, fmt.Errorf("no node has sufficient free slots for volume %d (need at least %d slots, max available: %d)", + volumeId, minSlotsNeeded, maxAvailableSlots) + } + + // Reserve slots only for non-local shards + bestNode.freeEcSlot -= bestSlotsNeeded + + return bestNode, bestSlotsNeeded, nil } -func (erb *ecRebuilder) rebuildEcVolumes(collection string) error { - fmt.Printf("rebuildEcVolumes %s\n", collection) +// releaseRebuilder releases the reserved slots back to the rebuilder node. +func (erb *ecRebuilder) releaseRebuilder(node *EcNode, slotsToRelease int) { + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() + + // Release slots by incrementing the free slot count + node.freeEcSlot += slotsToRelease +} + +func (erb *ecRebuilder) rebuildEcVolumes(collection string) { + fmt.Printf("rebuildEcVolumes for %q\n", collection) // collect vid => each shard locations, similar to ecShardMap in topology.go ecShardMap := make(EcShardMap) + erb.ecNodesMu.Lock() for _, ecNode := range erb.ecNodes { ecShardMap.registerEcNode(ecNode, collection) } + erb.ecNodesMu.Unlock() for vid, locations := range ecShardMap { shardCount := locations.shardCount() @@ -159,31 +225,37 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) error { continue } if shardCount < erasure_coding.DataShardsCount { - return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) + // Capture variables for closure + vid := vid + shardCount := shardCount + erb.ewg.Add(func() error { + return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) + }) + continue } - if err := erb.rebuildOneEcVolume(collection, vid, locations); err != nil { - return err - } - } + // Capture variables for closure + vid := vid + locations := locations - return nil + erb.ewg.Add(func() error { + // Select rebuilder and reserve slots atomically per volume + rebuilder, slotsToReserve, err := erb.selectAndReserveRebuilder(collection, vid) + if err != nil { + return fmt.Errorf("failed to select rebuilder for volume %d: %v", vid, err) + } + defer erb.releaseRebuilder(rebuilder, slotsToReserve) + + return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder) + }) + } } -func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations) error { +func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations, rebuilder *EcNode) error { if !erb.isLocked() { return fmt.Errorf("lock is lost") } - // TODO: fix this logic so it supports concurrent executions - rebuilder := erb.ecNodeWithMoreFreeSlots() - if rebuilder == nil { - return fmt.Errorf("no EC nodes available for rebuild") - } - if rebuilder.freeEcSlot < erasure_coding.TotalShardsCount { - return fmt.Errorf("disk space is not enough") - } - fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) // collect shard files to rebuilder local disk @@ -219,6 +291,9 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo return err } + // ensure ECNode updates are atomic + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds) return nil |
