aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-10 22:30:13 +0100
committerGitHub <noreply@github.com>2024-12-10 13:30:13 -0800
commit8c82c037b9b99b5514010267755745ca19eae8f4 (patch)
tree9da8a041ac9e0fe16e2edd2291c6a05ae048087b /weed/shell/command_ec_common.go
parentff1392f7f4a854d17dea13f6738a219d7bb2cf96 (diff)
downloadseaweedfs-8c82c037b9b99b5514010267755745ca19eae8f4.tar.xz
seaweedfs-8c82c037b9b99b5514010267755745ca19eae8f4.zip
Unify the re-balancing logic for `ec.encode` with `ec.balance`. (#6339)
Among others, this enables recent changes related to topology aware re-balancing at EC encoding time.
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go126
1 files changed, 108 insertions, 18 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 625674bfd..4b672b2ef 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand/v2"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -40,6 +41,72 @@ type EcRack struct {
}
var (
+ ecBalanceAlgorithmDescription = `
+ func EcBalance() {
+ for each collection:
+ balanceEcVolumes(collectionName)
+ for each rack:
+ balanceEcRack(rack)
+ }
+
+ func balanceEcVolumes(collectionName){
+ for each volume:
+ doDeduplicateEcShards(volumeId)
+
+ tracks rack~shardCount mapping
+ for each volume:
+ doBalanceEcShardsAcrossRacks(volumeId)
+
+ for each volume:
+ doBalanceEcShardsWithinRacks(volumeId)
+ }
+
+ // spread ec shards into more racks
+ func doBalanceEcShardsAcrossRacks(volumeId){
+ tracks rack~volumeIdShardCount mapping
+ averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
+ ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
+ for each ecShardsToMove {
+ destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
+ destVolumeServers = volume servers on the destRack
+ pickOneEcNodeAndMoveOneShard(destVolumeServers)
+ }
+ }
+
+ func doBalanceEcShardsWithinRacks(volumeId){
+ racks = collect all racks that the volume id is on
+ for rack, shards := range racks
+ doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
+ }
+
+ // move ec shards
+ func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
+ tracks volumeServer~volumeIdShardCount mapping
+ averageShardCount = len(shards) / numVolumeServers
+ volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
+ ecShardsToMove = select overflown ec shards from volumeServersOverAverage
+ for each ecShardsToMove {
+ destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
+ pickOneEcNodeAndMoveOneShard(destVolumeServers)
+ }
+ }
+
+ // move ec shards while keeping shard distribution for the same volume unchanged or more even
+ func balanceEcRack(rack){
+ averageShardCount = total shards / numVolumeServers
+ for hasMovedOneEcShard {
+ sort all volume servers ordered by the number of local ec shards
+ pick the volume server A with the lowest number of ec shards x
+ pick the volume server B with the highest number of ec shards y
+ if y > averageShardCount and x +1 <= averageShardCount {
+ if B has a ec shard with volume id v that A does not have {
+ move one ec shard v from B to A
+ hasMovedOneEcShard = true
+ }
+ }
+ }
+ }
+ `
// Overridable functions for testing.
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
@@ -58,6 +125,7 @@ func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPl
return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
}
+
func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
if replicaStr != "" {
rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
@@ -75,6 +143,45 @@ func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super
return rp, err
}
+func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
+
+ if delayBeforeCollecting > 0 {
+ time.Sleep(delayBeforeCollecting)
+ }
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
+
+}
+
+func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+ // list all possible locations
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
+ if err != nil {
+ return
+ }
+
+ // find out all volume servers with one slot left.
+ ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
+
+ sortEcNodesByFreeslotsDescending(ecNodes)
+
+ return
+}
+
+func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+ return collectEcNodesForDC(commandEnv, "")
+}
+
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
@@ -243,22 +350,6 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0
}
-func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
- // list all possible locations
- // collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
- if err != nil {
- return
- }
-
- // find out all volume servers with one slot left.
- ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
-
- sortEcNodesByFreeslotsDescending(ecNodes)
-
- return
-}
-
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
@@ -523,7 +614,6 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int
})
}
-// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
racks := ecb.racks()
@@ -876,7 +966,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
}
// collect all ec nodes
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, dc)
+ allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
if err != nil {
return err
}