aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-11-27 20:51:57 +0100
committerGitHub <noreply@github.com>2024-11-27 11:51:57 -0800
commit559a1fd0f4565bca3e2f4e6f0d90d188c7b3377a (patch)
treef02a7ffac330833ba36e8e5d006722f649387ac3 /weed/shell/command_ec_common.go
parent88fa64a01ae7ac8782e70dd29a8a9f6bf44a7e19 (diff)
downloadseaweedfs-559a1fd0f4565bca3e2f4e6f0d90d188c7b3377a.tar.xz
seaweedfs-559a1fd0f4565bca3e2f4e6f0d90d188c7b3377a.zip
Improve EC shards rebalancing logic across nodes (#6297)
* Improve EC shards rebalancing logic across nodes. - Favor target nodes with less preexisting shards, to ensure a fair distribution. - Randomize selection when multiple possible target nodes are available. - Add logic to account for replication settings when selecting target nodes (currently disabled). * Fix minor test typo. * Clarify internal error messages for `pickEcNodeToBalanceShardsInto()`.
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go80
1 files changed, 59 insertions, 21 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 7328fffe7..f70bc0131 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -2,6 +2,7 @@ package shell
import (
"context"
+ "errors"
"fmt"
"math/rand/v2"
@@ -481,6 +482,7 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int
})
}
+// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
@@ -527,6 +529,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid
return nil
}
+// TOOD: Return an error with details upon failure to resolve a destination rack.
func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId {
targets := []RackId{}
targetShards := -1
@@ -575,10 +578,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
- rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
- shardBits := findEcVolumeShards(ecNode, vid)
- return string(ecNode.rack), shardBits.ShardIdCount()
- })
+ rackToShardCount := countShardsByRack(vid, locations)
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
@@ -711,37 +711,75 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
return nil
}
-func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcNode int) (*EcNode, error) {
+ if existingLocation == nil {
+ return nil, fmt.Errorf("INTERNAL: missing source nodes")
+ }
+ if len(possibleDestinations) == 0 {
+ return nil, fmt.Errorf("INTERNAL: missing destination nodes")
+ }
- sortEcNodesByFreeslotsDescending(possibleDestinationEcNodes)
- skipReason := ""
- for _, destEcNode := range possibleDestinationEcNodes {
+ nodeShards := map[*EcNode]int{}
+ for _, node := range possibleDestinations {
+ nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
+ }
- if destEcNode.info.Id == existingLocation.info.Id {
- continue
+ targets := []*EcNode{}
+ targetShards := -1
+ for _, shards := range nodeShards {
+ if shards > targetShards {
+ targetShards = shards
}
+ }
- if destEcNode.freeEcSlot <= 0 {
- skipReason += fmt.Sprintf(" Skipping %s because it has no free slots\n", destEcNode.info.Id)
+ details := ""
+ for _, node := range possibleDestinations {
+ if node.info.Id == existingLocation.info.Id {
continue
}
- if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode {
- skipReason += fmt.Sprintf(" Skipping %s because it %d >= avernageShards (%d)\n",
- destEcNode.info.Id, findEcVolumeShards(destEcNode, vid).ShardIdCount(), averageShardsPerEcNode)
+ if node.freeEcSlot <= 0 {
+ details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
continue
}
- fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
+ shards := nodeShards[node]
+ if replicaPlacement != nil && shards >= replicaPlacement.SameRackCount {
+ details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, replicaPlacement.SameRackCount)
+ continue
+ }
+ if shards >= averageShardsPerEcNode {
+ details += fmt.Sprintf(" Skipped %s because shards %d >= averageShards (%d)\n",
+ node.info.Id, shards, averageShardsPerEcNode)
+ continue
+ }
- err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
- if err != nil {
- return err
+ if shards < targetShards {
+ // Favor nodes with less shards, to ensure an uniform distribution.
+ targets = nil
+ targetShards = shards
}
+ if shards == targetShards {
+ targets = append(targets, node)
+ }
+ }
+ if len(targets) == 0 {
+ return nil, errors.New(details)
+ }
+ return targets[rand.IntN(len(targets))], nil
+}
+
+// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
+func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+ // TODO: consider volume replica info when balancing nodes
+ destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, nil, averageShardsPerEcNode)
+ if err != nil {
+ fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
return nil
}
- fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, skipReason)
- return nil
+
+ fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
+ return moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destNode, applyBalancing)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {