diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2024-11-19 03:05:06 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-11-18 18:05:06 -0800 |
| commit | f2db746690b903c50d512db568d739a91a00179e (patch) | |
| tree | 67b7b1f8ed852f46531714351494973e97f197a3 /weed/shell/command_ec_common.go | |
| parent | 7b3c0e937f83d3b49799b5d5dcb98b0043461c25 (diff) | |
| download | seaweedfs-f2db746690b903c50d512db568d739a91a00179e.tar.xz seaweedfs-f2db746690b903c50d512db568d739a91a00179e.zip | |
Introduce logic to resolve volume replica placement within EC rebalancing. (#6254)
* Rename `command_ec_encode_test.go` to `command_ec_common_test.go`.
All tests defined in this file are now for `command_ec_common.go`.
* Minor code cleanups.
- Fix broken `ec.balance` test.
- Rework integer ceiling division to not use floats, which can introduce precision errors.
* Introduce logic to resolve volume replica placement within EC rebalancing.
This will be used to make rebalancing logic topology-aware.
* Give shell.EcNode.dc a dedicated DataCenterId type.
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 70 |
1 files changed, 44 insertions, 26 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index fd7a1acdc..b98921fd7 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,7 +3,6 @@ package shell import ( "context" "fmt" - "math" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -12,11 +11,32 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "golang.org/x/exp/slices" "google.golang.org/grpc" ) +type DataCenterId string +type EcNodeId string +type RackId string + +type EcNode struct { + info *master_pb.DataNodeInfo + dc DataCenterId + rack RackId + freeEcSlot int +} +type CandidateEcNode struct { + ecNode *EcNode + shardCount int +} + +type EcRack struct { + ecNodes map[EcNodeId]*EcNode + freeEcSlot int +} + func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { if !commandEnv.isLocked() { @@ -68,7 +88,6 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if targetAddress != existingLocation { - fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), @@ -109,6 +128,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, return } +// TODO: Make dc a DataCenterId instead of string. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) { for _, dc := range topo.DataCenterInfos { for _, rack := range dc.RackInfos { @@ -131,11 +151,6 @@ func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) { }) } -type CandidateEcNode struct { - ecNode *EcNode - shardCount int -} - // if the index node changed the freeEcSlot, need to keep every EcNode still sorted func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) { for i := index - 1; i >= 0; i-- { @@ -179,16 +194,6 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (c return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos) } -type RackId string -type EcNodeId string - -type EcNode struct { - info *master_pb.DataNodeInfo - dc string - rack RackId - freeEcSlot int -} - func (ecNode *EcNode) localShardIdCount(vid uint32) int { for _, diskInfo := range ecNode.info.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { @@ -201,13 +206,7 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -type EcRack struct { - ecNodes map[EcNodeId]*EcNode - freeEcSlot int -} - func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -232,7 +231,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) ecNodes = append(ecNodes, &EcNode{ info: dn, - dc: dc, + dc: DataCenterId(dc), rack: rack, freeEcSlot: int(freeEcSlots), }) @@ -283,8 +282,12 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n }) } -func ceilDivide(total, n int) int { - return int(math.Ceil(float64(total) / float64(n))) +func ceilDivide(a, b int) int { + var r int + if (a % b) != 0 { + r = 1 + } + return (a / b) + r } func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { @@ -772,6 +775,21 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl return vidLocations } +func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) { + for _, ecNode := range nodes { + for _, diskInfo := range ecNode.info.DiskInfos { + for _, volumeInfo := range diskInfo.VolumeInfos { + if needle.VolumeId(volumeInfo.Id) != vid { + continue + } + return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + } + } + } + + return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid) +} + func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) { if len(collections) == 0 { return fmt.Errorf("no collections to balance") |
