aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-11-21 17:46:24 +0100
committerGitHub <noreply@github.com>2024-11-21 08:46:24 -0800
commitca499de1cbd43b8a1bcdcbf2d62d137a7557bbc6 (patch)
tree75da93dfa4eb54036e88fef9d4371efe5a59ebd9 /weed/shell/command_ec_common.go
parente56327e3b0c741a7cf3c09d766814a2168a2720f (diff)
downloadseaweedfs-ca499de1cbd43b8a1bcdcbf2d62d137a7557bbc6.tar.xz
seaweedfs-ca499de1cbd43b8a1bcdcbf2d62d137a7557bbc6.zip
Improve EC shards rebalancing logic across racks (#6270)
Improve EC shards rebalancing logic across racks. - Favor target shards with less preexisting shards, to ensure a fair distribution. - Randomize selection when multiple possible target shards are available. - Add logic to account for replication settings when selecting target shards (currently disabled).
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go83
1 files changed, 63 insertions, 20 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 39c3f75c5..6e7ad7e43 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -3,6 +3,7 @@ package shell
import (
"context"
"fmt"
+ "math/rand"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -473,16 +474,19 @@ func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra
return nil
}
-func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
+func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
+ return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
+ shardBits := findEcVolumeShards(ecNode, vid)
+ return string(ecNode.rack), shardBits.ShardIdCount()
+ })
+}
+func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
// see the volume's shards are in how many racks, and how many in each rack
- rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
- shardBits := findEcVolumeShards(ecNode, vid)
- return string(ecNode.rack), shardBits.ShardIdCount()
- })
+ rackToShardCount := countShardsByRack(vid, locations)
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
@@ -490,16 +494,18 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid
// ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
for rackId, count := range rackToShardCount {
- if count > averageShardsPerEcRack {
- possibleEcNodes := rackEcNodesWithVid[rackId]
- for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
- ecShardsToMove[shardId] = ecNode
- }
+ if count <= averageShardsPerEcRack {
+ continue
+ }
+ possibleEcNodes := rackEcNodesWithVid[rackId]
+ for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
+ ecShardsToMove[shardId] = ecNode
}
}
for shardId, ecNode := range ecShardsToMove {
- rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack)
+ // TODO: consider volume replica info when balancing racks
+ rackId := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack)
if rackId == "" {
fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
continue
@@ -521,23 +527,44 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid
return nil
}
-func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId {
-
- // TODO later may need to add some randomness
+func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId {
+ targets := []RackId{}
+ targetShards := -1
+ for _, shards := range rackToShardCount {
+ if shards > targetShards {
+ targetShards = shards
+ }
+ }
for rackId, rack := range rackToEcNodes {
- if rackToShardCount[string(rackId)] >= averageShardsPerEcRack {
- continue
- }
+ shards := rackToShardCount[string(rackId)]
if rack.freeEcSlot <= 0 {
+ // No EC shards slots left :(
continue
}
-
- return rackId
+ if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount {
+ // Don't select racks with more EC shards for the target volume than the replicaton limit.
+ continue
+ }
+ if shards >= averageShardsPerEcRack {
+ // Keep EC shards across racks as balanced as possible.
+ continue
+ }
+ if shards < targetShards {
+ // Favor racks with less shards, to ensure an uniform distribution.
+ targets = nil
+ targetShards = shards
+ }
+ if shards == targetShards {
+ targets = append(targets, rackId)
+ }
}
- return ""
+ if len(targets) == 0 {
+ return ""
+ }
+ return targets[rand.Intn(len(targets))]
}
func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
@@ -774,6 +801,7 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl
return vidLocations
}
+// TODO: EC volumes have no replica placement info :( Maybe rely on the master's default?
func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
for _, ecNode := range nodes {
for _, diskInfo := range ecNode.info.DiskInfos {
@@ -789,6 +817,21 @@ func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_bl
return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
}
+func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
+ var resp *master_pb.GetMasterConfigurationResponse
+ var err error
+
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ return err
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
+}
+
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) {
if len(collections) == 0 {
return fmt.Errorf("no collections to balance")