diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2024-11-27 20:51:57 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-11-27 11:51:57 -0800 |
| commit | 559a1fd0f4565bca3e2f4e6f0d90d188c7b3377a (patch) | |
| tree | f02a7ffac330833ba36e8e5d006722f649387ac3 /weed/shell/command_ec_common.go | |
| parent | 88fa64a01ae7ac8782e70dd29a8a9f6bf44a7e19 (diff) | |
| download | seaweedfs-559a1fd0f4565bca3e2f4e6f0d90d188c7b3377a.tar.xz seaweedfs-559a1fd0f4565bca3e2f4e6f0d90d188c7b3377a.zip | |
Improve EC shards rebalancing logic across nodes (#6297)
* Improve EC shards rebalancing logic across nodes.
- Favor target nodes with less preexisting shards, to ensure a fair distribution.
- Randomize selection when multiple possible target nodes are available.
- Add logic to account for replication settings when selecting target nodes (currently disabled).
* Fix minor test typo.
* Clarify internal error messages for `pickEcNodeToBalanceShardsInto()`.
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 80 |
1 files changed, 59 insertions, 21 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 7328fffe7..f70bc0131 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -2,6 +2,7 @@ package shell import ( "context" + "errors" "fmt" "math/rand/v2" @@ -481,6 +482,7 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int }) } +// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) @@ -527,6 +529,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid return nil } +// TOOD: Return an error with details upon failure to resolve a destination rack. func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId { targets := []RackId{} targetShards := -1 @@ -575,10 +578,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra for vid, locations := range vidLocations { // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid) - return string(ecNode.rack), shardBits.ShardIdCount() - }) + rackToShardCount := countShardsByRack(vid, locations) rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) @@ -711,37 +711,75 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool return nil } -func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcNode int) (*EcNode, error) { + if existingLocation == nil { + return nil, fmt.Errorf("INTERNAL: missing source nodes") + } + if len(possibleDestinations) == 0 { + return nil, fmt.Errorf("INTERNAL: missing destination nodes") + } - sortEcNodesByFreeslotsDescending(possibleDestinationEcNodes) - skipReason := "" - for _, destEcNode := range possibleDestinationEcNodes { + nodeShards := map[*EcNode]int{} + for _, node := range possibleDestinations { + nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount() + } - if destEcNode.info.Id == existingLocation.info.Id { - continue + targets := []*EcNode{} + targetShards := -1 + for _, shards := range nodeShards { + if shards > targetShards { + targetShards = shards } + } - if destEcNode.freeEcSlot <= 0 { - skipReason += fmt.Sprintf(" Skipping %s because it has no free slots\n", destEcNode.info.Id) + details := "" + for _, node := range possibleDestinations { + if node.info.Id == existingLocation.info.Id { continue } - if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode { - skipReason += fmt.Sprintf(" Skipping %s because it %d >= avernageShards (%d)\n", - destEcNode.info.Id, findEcVolumeShards(destEcNode, vid).ShardIdCount(), averageShardsPerEcNode) + if node.freeEcSlot <= 0 { + details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id) continue } - fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id) + shards := nodeShards[node] + if replicaPlacement != nil && shards >= replicaPlacement.SameRackCount { + details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, replicaPlacement.SameRackCount) + continue + } + if shards >= averageShardsPerEcNode { + details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n", + node.info.Id, shards, averageShardsPerEcNode) + continue + } - err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) - if err != nil { - return err + if shards < targetShards { + // Favor nodes with less shards, to ensure an uniform distribution. + targets = nil + targetShards = shards } + if shards == targetShards { + targets = append(targets, node) + } + } + if len(targets) == 0 { + return nil, errors.New(details) + } + return targets[rand.IntN(len(targets))], nil +} + +// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. +func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { + // TODO: consider volume replica info when balancing nodes + destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, nil, averageShardsPerEcNode) + if err != nil { + fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) return nil } - fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, skipReason) - return nil + + fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) + return moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destNode, applyBalancing) } func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { |
