diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2024-12-10 22:30:13 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-12-10 13:30:13 -0800 |
| commit | 8c82c037b9b99b5514010267755745ca19eae8f4 (patch) | |
| tree | 9da8a041ac9e0fe16e2edd2291c6a05ae048087b /weed/shell/command_ec_common.go | |
| parent | ff1392f7f4a854d17dea13f6738a219d7bb2cf96 (diff) | |
| download | seaweedfs-8c82c037b9b99b5514010267755745ca19eae8f4.tar.xz seaweedfs-8c82c037b9b99b5514010267755745ca19eae8f4.zip | |
Unify the re-balancing logic for `ec.encode` with `ec.balance`. (#6339)
Among others, this enables recent changes related to topology aware
re-balancing at EC encoding time.
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 126 |
1 files changed, 108 insertions, 18 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 625674bfd..4b672b2ef 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/rand/v2" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -40,6 +41,72 @@ type EcRack struct { } var ( + ecBalanceAlgorithmDescription = ` + func EcBalance() { + for each collection: + balanceEcVolumes(collectionName) + for each rack: + balanceEcRack(rack) + } + + func balanceEcVolumes(collectionName){ + for each volume: + doDeduplicateEcShards(volumeId) + + tracks rack~shardCount mapping + for each volume: + doBalanceEcShardsAcrossRacks(volumeId) + + for each volume: + doBalanceEcShardsWithinRacks(volumeId) + } + + // spread ec shards into more racks + func doBalanceEcShardsAcrossRacks(volumeId){ + tracks rack~volumeIdShardCount mapping + averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc + ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack + for each ecShardsToMove { + destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement) + destVolumeServers = volume servers on the destRack + pickOneEcNodeAndMoveOneShard(destVolumeServers) + } + } + + func doBalanceEcShardsWithinRacks(volumeId){ + racks = collect all racks that the volume id is on + for rack, shards := range racks + doBalanceEcShardsWithinOneRack(volumeId, shards, rack) + } + + // move ec shards + func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){ + tracks volumeServer~volumeIdShardCount mapping + averageShardCount = len(shards) / numVolumeServers + volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack + ecShardsToMove = select overflown ec shards from volumeServersOverAverage + for each ecShardsToMove { + destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement) + pickOneEcNodeAndMoveOneShard(destVolumeServers) + } + } + + // move ec shards while keeping shard distribution for the same volume unchanged or more even + func balanceEcRack(rack){ + averageShardCount = total shards / numVolumeServers + for hasMovedOneEcShard { + sort all volume servers ordered by the number of local ec shards + pick the volume server A with the lowest number of ec shards x + pick the volume server B with the highest number of ec shards y + if y > averageShardCount and x +1 <= averageShardCount { + if B has a ec shard with volume id v that A does not have { + move one ec shard v from B to A + hasMovedOneEcShard = true + } + } + } + } + ` // Overridable functions for testing. getDefaultReplicaPlacement = _getDefaultReplicaPlacement ) @@ -58,6 +125,7 @@ func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPl return super_block.NewReplicaPlacementFromString(resp.DefaultReplication) } + func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) { if replicaStr != "" { rp, err := super_block.NewReplicaPlacementFromString(replicaStr) @@ -75,6 +143,45 @@ func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super return rp, err } +func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { + + if delayBeforeCollecting > 0 { + time.Sleep(delayBeforeCollecting) + } + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil + +} + +func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + // list all possible locations + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) + if err != nil { + return + } + + // find out all volume servers with one slot left. + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) + + sortEcNodesByFreeslotsDescending(ecNodes) + + return +} + +func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + return collectEcNodesForDC(commandEnv, "") +} + func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { if !commandEnv.isLocked() { @@ -243,22 +350,6 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - // list all possible locations - // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) - if err != nil { - return - } - - // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) - - sortEcNodesByFreeslotsDescending(ecNodes) - - return -} - func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if selectedDataCenter != "" && selectedDataCenter != string(dc) { @@ -523,7 +614,6 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int }) } -// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error { racks := ecb.racks() @@ -876,7 +966,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic } // collect all ec nodes - allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, dc) + allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc) if err != nil { return err } |
