diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2024-11-21 17:46:24 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-11-21 08:46:24 -0800 |
| commit | ca499de1cbd43b8a1bcdcbf2d62d137a7557bbc6 (patch) | |
| tree | 75da93dfa4eb54036e88fef9d4371efe5a59ebd9 /weed/shell/command_ec_common.go | |
| parent | e56327e3b0c741a7cf3c09d766814a2168a2720f (diff) | |
| download | seaweedfs-ca499de1cbd43b8a1bcdcbf2d62d137a7557bbc6.tar.xz seaweedfs-ca499de1cbd43b8a1bcdcbf2d62d137a7557bbc6.zip | |
Improve EC shards rebalancing logic across racks (#6270)
Improve EC shards rebalancing logic across racks.
- Favor target shards with less preexisting shards, to ensure a fair distribution.
- Randomize selection when multiple possible target shards are available.
- Add logic to account for replication settings when selecting target shards (currently disabled).
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 83 |
1 files changed, 63 insertions, 20 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 39c3f75c5..6e7ad7e43 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,6 +3,7 @@ package shell import ( "context" "fmt" + "math/rand" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -473,16 +474,19 @@ func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra return nil } -func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { +func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { + return groupByCount(locations, func(ecNode *EcNode) (id string, count int) { + shardBits := findEcVolumeShards(ecNode, vid) + return string(ecNode.rack), shardBits.ShardIdCount() + }) +} +func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid) - return string(ecNode.rack), shardBits.ShardIdCount() - }) + rackToShardCount := countShardsByRack(vid, locations) rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) @@ -490,16 +494,18 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode) for rackId, count := range rackToShardCount { - if count > averageShardsPerEcRack { - possibleEcNodes := rackEcNodesWithVid[rackId] - for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { - ecShardsToMove[shardId] = ecNode - } + if count <= averageShardsPerEcRack { + continue + } + possibleEcNodes := rackEcNodesWithVid[rackId] + for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { + ecShardsToMove[shardId] = ecNode } } for shardId, ecNode := range ecShardsToMove { - rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack) + // TODO: consider volume replica info when balancing racks + rackId := pickRackToBalanceShardsInto(racks, rackToShardCount, nil, averageShardsPerEcRack) if rackId == "" { fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id) continue @@ -521,23 +527,44 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid return nil } -func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId { - - // TODO later may need to add some randomness +func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) RackId { + targets := []RackId{} + targetShards := -1 + for _, shards := range rackToShardCount { + if shards > targetShards { + targetShards = shards + } + } for rackId, rack := range rackToEcNodes { - if rackToShardCount[string(rackId)] >= averageShardsPerEcRack { - continue - } + shards := rackToShardCount[string(rackId)] if rack.freeEcSlot <= 0 { + // No EC shards slots left :( continue } - - return rackId + if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount { + // Don't select racks with more EC shards for the target volume than the replicaton limit. + continue + } + if shards >= averageShardsPerEcRack { + // Keep EC shards across racks as balanced as possible. + continue + } + if shards < targetShards { + // Favor racks with less shards, to ensure an uniform distribution. + targets = nil + targetShards = shards + } + if shards == targetShards { + targets = append(targets, rackId) + } } - return "" + if len(targets) == 0 { + return "" + } + return targets[rand.Intn(len(targets))] } func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { @@ -774,6 +801,7 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl return vidLocations } +// TODO: EC volumes have no replica placement info :( Maybe rely on the master's default? func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) { for _, ecNode := range nodes { for _, diskInfo := range ecNode.info.DiskInfos { @@ -789,6 +817,21 @@ func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_bl return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid) } +func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) { + var resp *master_pb.GetMasterConfigurationResponse + var err error + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + return err + }) + if err != nil { + return nil, err + } + + return super_block.NewReplicaPlacementFromString(resp.DefaultReplication) +} + func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") |
