aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_ec_rebuild.go143
-rw-r--r--weed/shell/command_ec_rebuild_test.go92
2 files changed, 130 insertions, 105 deletions
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 3b2c63cc7..79acebff1 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
+ "sync"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -18,12 +19,14 @@ func init() {
}
type ecRebuilder struct {
- // TODO: add ErrorWaitGroup for parallelization
commandEnv *CommandEnv
ecNodes []*EcNode
writer io.Writer
applyChanges bool
collections []string
+
+ ewg *ErrorWaitGroup
+ ecNodesMu sync.Mutex
}
type commandEcRebuild struct {
@@ -36,7 +39,14 @@ func (c *commandEcRebuild) Name() string {
func (c *commandEcRebuild) Help() string {
return `find and rebuild missing ec shards among volume servers
- ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply]
+ ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N]
+
+ Options:
+ -collection: specify a collection name, or "EACH_COLLECTION" to process all collections
+ -apply: actually perform the rebuild operations (default is dry-run mode)
+ -maxParallelization: number of volumes to rebuild concurrently (default: 10)
+ Increase for faster rebuilds with more system resources.
+ Decrease if experiencing resource contention or instability.
Algorithm:
@@ -71,13 +81,13 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
+ maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyChanges := fixCommand.Bool("apply", false, "apply the changes")
// TODO: remove this alias
applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)")
if err = fixCommand.Parse(args); err != nil {
return nil
}
-
handleDeprecatedForceFlag(writer, fixCommand, applyChangesAlias, applyChanges)
infoAboutSimulationMode(writer, *applyChanges, "-apply")
@@ -107,17 +117,16 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
writer: writer,
applyChanges: *applyChanges,
collections: collections,
+
+ ewg: NewErrorWaitGroup(*maxParallelization),
}
fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections))
for _, c := range collections {
- fmt.Printf("rebuildEcVolumes collection %s\n", c)
- if err = erb.rebuildEcVolumes(c); err != nil {
- return err
- }
+ erb.rebuildEcVolumes(c)
}
- return nil
+ return erb.ewg.Wait()
}
func (erb *ecRebuilder) write(format string, a ...any) {
@@ -128,30 +137,87 @@ func (erb *ecRebuilder) isLocked() bool {
return erb.commandEnv.isLocked()
}
-// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder.
-func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode {
+// countLocalShards returns the number of shards already present locally on the node for the given volume.
+func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volumeId needle.VolumeId) int {
+ for _, diskInfo := range node.info.DiskInfos {
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId {
+ shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ return len(shardBits.ShardIds())
+ }
+ }
+ }
+ return 0
+}
+
+// selectAndReserveRebuilder atomically selects a rebuilder node with sufficient free slots
+// and reserves slots only for the non-local shards that need to be copied/generated.
+func (erb *ecRebuilder) selectAndReserveRebuilder(collection string, volumeId needle.VolumeId) (*EcNode, int, error) {
+ erb.ecNodesMu.Lock()
+ defer erb.ecNodesMu.Unlock()
+
if len(erb.ecNodes) == 0 {
- return nil
+ return nil, 0, fmt.Errorf("no ec nodes available")
}
- res := erb.ecNodes[0]
- for i := 1; i < len(erb.ecNodes); i++ {
- if erb.ecNodes[i].freeEcSlot > res.freeEcSlot {
- res = erb.ecNodes[i]
+ // Find the node with the most free slots, considering local shards
+ var bestNode *EcNode
+ var bestSlotsNeeded int
+ var maxAvailableSlots int
+ var minSlotsNeeded int = erasure_coding.TotalShardsCount // Start with maximum possible
+ for _, node := range erb.ecNodes {
+ localShards := erb.countLocalShards(node, collection, volumeId)
+ slotsNeeded := erasure_coding.TotalShardsCount - localShards
+ if slotsNeeded < 0 {
+ slotsNeeded = 0
+ }
+
+ if node.freeEcSlot > maxAvailableSlots {
+ maxAvailableSlots = node.freeEcSlot
+ }
+
+ if slotsNeeded < minSlotsNeeded {
+ minSlotsNeeded = slotsNeeded
+ }
+
+ if node.freeEcSlot >= slotsNeeded {
+ if bestNode == nil || node.freeEcSlot > bestNode.freeEcSlot {
+ bestNode = node
+ bestSlotsNeeded = slotsNeeded
+ }
}
}
- return res
+ if bestNode == nil {
+ return nil, 0, fmt.Errorf("no node has sufficient free slots for volume %d (need at least %d slots, max available: %d)",
+ volumeId, minSlotsNeeded, maxAvailableSlots)
+ }
+
+ // Reserve slots only for non-local shards
+ bestNode.freeEcSlot -= bestSlotsNeeded
+
+ return bestNode, bestSlotsNeeded, nil
}
-func (erb *ecRebuilder) rebuildEcVolumes(collection string) error {
- fmt.Printf("rebuildEcVolumes %s\n", collection)
+// releaseRebuilder releases the reserved slots back to the rebuilder node.
+func (erb *ecRebuilder) releaseRebuilder(node *EcNode, slotsToRelease int) {
+ erb.ecNodesMu.Lock()
+ defer erb.ecNodesMu.Unlock()
+
+ // Release slots by incrementing the free slot count
+ node.freeEcSlot += slotsToRelease
+}
+
+func (erb *ecRebuilder) rebuildEcVolumes(collection string) {
+ fmt.Printf("rebuildEcVolumes for %q\n", collection)
// collect vid => each shard locations, similar to ecShardMap in topology.go
ecShardMap := make(EcShardMap)
+ erb.ecNodesMu.Lock()
for _, ecNode := range erb.ecNodes {
ecShardMap.registerEcNode(ecNode, collection)
}
+ erb.ecNodesMu.Unlock()
for vid, locations := range ecShardMap {
shardCount := locations.shardCount()
@@ -159,31 +225,37 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) error {
continue
}
if shardCount < erasure_coding.DataShardsCount {
- return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount)
+ // Capture variables for closure
+ vid := vid
+ shardCount := shardCount
+ erb.ewg.Add(func() error {
+ return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount)
+ })
+ continue
}
- if err := erb.rebuildOneEcVolume(collection, vid, locations); err != nil {
- return err
- }
- }
+ // Capture variables for closure
+ vid := vid
+ locations := locations
- return nil
+ erb.ewg.Add(func() error {
+ // Select rebuilder and reserve slots atomically per volume
+ rebuilder, slotsToReserve, err := erb.selectAndReserveRebuilder(collection, vid)
+ if err != nil {
+ return fmt.Errorf("failed to select rebuilder for volume %d: %v", vid, err)
+ }
+ defer erb.releaseRebuilder(rebuilder, slotsToReserve)
+
+ return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder)
+ })
+ }
}
-func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations) error {
+func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations, rebuilder *EcNode) error {
if !erb.isLocked() {
return fmt.Errorf("lock is lost")
}
- // TODO: fix this logic so it supports concurrent executions
- rebuilder := erb.ecNodeWithMoreFreeSlots()
- if rebuilder == nil {
- return fmt.Errorf("no EC nodes available for rebuild")
- }
- if rebuilder.freeEcSlot < erasure_coding.TotalShardsCount {
- return fmt.Errorf("disk space is not enough")
- }
-
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
// collect shard files to rebuilder local disk
@@ -219,6 +291,9 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
return err
}
+ // ensure ECNode updates are atomic
+ erb.ecNodesMu.Lock()
+ defer erb.ecNodesMu.Unlock()
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds)
return nil
diff --git a/weed/shell/command_ec_rebuild_test.go b/weed/shell/command_ec_rebuild_test.go
index 5ab431137..b732e210a 100644
--- a/weed/shell/command_ec_rebuild_test.go
+++ b/weed/shell/command_ec_rebuild_test.go
@@ -79,69 +79,6 @@ func TestEcShardMapShardCount(t *testing.T) {
}
}
-// TestEcRebuilderEcNodeWithMoreFreeSlots tests the free slot selection
-func TestEcRebuilderEcNodeWithMoreFreeSlots(t *testing.T) {
- testCases := []struct {
- name string
- nodes []*EcNode
- expectedNode string
- }{
- {
- name: "single node",
- nodes: []*EcNode{
- newEcNode("dc1", "rack1", "node1", 100),
- },
- expectedNode: "node1",
- },
- {
- name: "multiple nodes - select highest",
- nodes: []*EcNode{
- newEcNode("dc1", "rack1", "node1", 50),
- newEcNode("dc1", "rack1", "node2", 150),
- newEcNode("dc1", "rack1", "node3", 100),
- },
- expectedNode: "node2",
- },
- {
- name: "multiple nodes - same slots",
- nodes: []*EcNode{
- newEcNode("dc1", "rack1", "node1", 100),
- newEcNode("dc1", "rack1", "node2", 100),
- },
- expectedNode: "node1", // Should return first one
- },
- }
-
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- erb := &ecRebuilder{
- ecNodes: tc.nodes,
- }
-
- node := erb.ecNodeWithMoreFreeSlots()
- if node == nil {
- t.Fatal("Expected a node, got nil")
- }
-
- if node.info.Id != tc.expectedNode {
- t.Errorf("Expected node %s, got %s", tc.expectedNode, node.info.Id)
- }
- })
- }
-}
-
-// TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty tests empty node list
-func TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty(t *testing.T) {
- erb := &ecRebuilder{
- ecNodes: []*EcNode{},
- }
-
- node := erb.ecNodeWithMoreFreeSlots()
- if node != nil {
- t.Errorf("Expected nil for empty node list, got %v", node)
- }
-}
-
// TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes
func TestRebuildEcVolumesInsufficientShards(t *testing.T) {
var logBuffer bytes.Buffer
@@ -155,15 +92,17 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) {
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
+ ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
}
- err := erb.rebuildEcVolumes("c1")
+ erb.rebuildEcVolumes("c1")
+ err := erb.ewg.Wait()
+
if err == nil {
t.Fatal("Expected error for insufficient shards, got nil")
}
-
if !strings.Contains(err.Error(), "unrepairable") {
t.Errorf("Expected 'unrepairable' in error message, got: %s", err.Error())
}
@@ -182,12 +121,15 @@ func TestRebuildEcVolumesCompleteVolume(t *testing.T) {
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
+ ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
applyChanges: false,
}
- err := erb.rebuildEcVolumes("c1")
+ erb.rebuildEcVolumes("c1")
+ err := erb.ewg.Wait()
+
if err != nil {
t.Fatalf("Expected no error for complete volume, got: %v", err)
}
@@ -201,7 +143,9 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) {
var logBuffer bytes.Buffer
// Create a volume with missing shards but insufficient free slots
- node1 := newEcNode("dc1", "rack1", "node1", 5). // Only 5 free slots, need 14
+ // Node has 10 local shards, missing 4 shards (10,11,12,13), so needs 4 free slots
+ // Set free slots to 3 (insufficient)
+ node1 := newEcNode("dc1", "rack1", "node1", 3). // Only 3 free slots, need 4
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
erb := &ecRebuilder{
@@ -209,18 +153,24 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) {
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
+ ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
applyChanges: false,
}
- err := erb.rebuildEcVolumes("c1")
+ erb.rebuildEcVolumes("c1")
+ err := erb.ewg.Wait()
+
if err == nil {
t.Fatal("Expected error for insufficient disk space, got nil")
}
-
- if !strings.Contains(err.Error(), "disk space is not enough") {
- t.Errorf("Expected 'disk space' in error message, got: %s", err.Error())
+ if !strings.Contains(err.Error(), "no node has sufficient free slots") {
+ t.Errorf("Expected 'no node has sufficient free slots' in error message, got: %s", err.Error())
+ }
+ // Verify the enhanced error message includes diagnostic information
+ if !strings.Contains(err.Error(), "need") || !strings.Contains(err.Error(), "max available") {
+ t.Errorf("Expected diagnostic information in error message, got: %s", err.Error())
}
}