diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2024-12-05 18:00:46 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-12-05 09:00:46 -0800 |
| commit | 34cdbdd2795d4c89930066eb0c07751201f683fc (patch) | |
| tree | 88c7b5b75515458ea20250d29a3d7ae9f2088ad6 /weed/shell/command_ec_common.go | |
| parent | edef48533361dee2ae598782b85e233cc8110e50 (diff) | |
| download | seaweedfs-34cdbdd2795d4c89930066eb0c07751201f683fc.tar.xz seaweedfs-34cdbdd2795d4c89930066eb0c07751201f683fc.zip | |
Share common parameters for EC re-balancing functions under a single struct. (#6319)
TODO cleanup for https://github.com/seaweedfs/seaweedfs/discussions/6179.
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 118 |
1 files changed, 63 insertions, 55 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 96cfc40db..e91656931 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -39,8 +39,6 @@ type EcRack struct { freeEcSlot int } -// TODO: We're shuffling way too many parameters between internal functions. Encapsulate in a ecBalancer{} struct. - var ( // Overridable functions for testing. getDefaultReplicaPlacement = _getDefaultReplicaPlacement @@ -421,10 +419,16 @@ func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string] return groupMap } -func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack { - // collect racks info +type ecBalancer struct { + commandEnv *CommandEnv + ecNodes []*EcNode + replicaPlacement *super_block.ReplicaPlacement + applyBalancing bool +} + +func (ecb *ecBalancer) racks() map[RackId]*EcRack { racks := make(map[RackId]*EcRack) - for _, ecNode := range allEcNodes { + for _, ecNode := range ecb.ecNodes { if racks[ecNode.rack] == nil { racks[ecNode.rack] = &EcRack{ ecNodes: make(map[EcNodeId]*EcNode), @@ -436,39 +440,38 @@ func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack { return racks } -func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) balanceEcVolumes(collection string) error { fmt.Printf("balanceEcVolumes %s\n", collection) - if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil { + if err := ecb.deleteDuplicatedEcShards(collection); err != nil { return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, rp, applyBalancing); err != nil { + if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil { return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, rp, applyBalancing); err != nil { + if err := ecb.balanceEcShardsWithinRacks(collection); err != nil { return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err) } return nil } -func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error { +func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error { // vid => []ecNode - vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) + vidLocations := ecb.collectVolumeIdToEcNodes(collection) // deduplicate ec shards for vid, locations := range vidLocations { - if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil { + if err := ecb.doDeduplicateEcShards(collection, vid, locations); err != nil { return err } } return nil } -func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { - +func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error { // check whether this volume has ecNodes that are over average shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount) for _, ecNode := range locations { @@ -483,16 +486,16 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle } sortEcNodesByFreeslotsAscending(ecNodes) fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id) - if !applyBalancing { + if !ecb.applyBalancing { continue } duplicatedShardIds := []uint32{uint32(shardId)} for _, ecNode := range ecNodes[1:] { - if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { + if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } - if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { + if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) @@ -501,12 +504,12 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle return nil } -func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations - vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) + vidLocations := ecb.collectVolumeIdToEcNodes(collection) // spread the ec shards evenly for vid, locations := range vidLocations { - if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, rp, applyBalancing); err != nil { + if err := ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations); err != nil { return err } } @@ -521,7 +524,9 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int } // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. -func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error { + racks := ecb.racks() + // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) @@ -544,7 +549,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid } for shardId, ecNode := range ecShardsToMove { - rackId, err := pickRackToBalanceShardsInto(racks, rackToShardCount, rp, averageShardsPerEcRack) + rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount, averageShardsPerEcRack) if err != nil { fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error()) continue @@ -554,7 +559,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid for _, n := range racks[rackId].ecNodes { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } - err = pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, rp, applyBalancing) + err = ecb.pickOneEcNodeAndMoveOneShard(averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes) if err != nil { return err } @@ -567,7 +572,7 @@ func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid return nil } -func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcRack int) (RackId, error) { +func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) (RackId, error) { targets := []RackId{} targetShards := -1 for _, shards := range rackToShardCount { @@ -584,8 +589,8 @@ func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCo details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId) continue } - if replicaPlacement != nil && shards >= replicaPlacement.DiffRackCount { - details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, replicaPlacement.DiffRackCount) + if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.DiffRackCount { + details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount) continue } if shards >= averageShardsPerEcRack { @@ -608,9 +613,10 @@ func pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCo return targets[rand.IntN(len(targets))], nil } -func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, rp *super_block.ReplicaPlacement, applyBalancing bool) error { +func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { // collect vid => []ecNode, since previous steps can change the locations - vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection) + vidLocations := ecb.collectVolumeIdToEcNodes(collection) + racks := ecb.racks() // spread the ec shards evenly for vid, locations := range vidLocations { @@ -631,7 +637,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) - if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, rp, applyBalancing); err != nil { + if err := ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes); err != nil { return err } } @@ -639,8 +645,7 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra return nil } -func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, rp *super_block.ReplicaPlacement, applyBalancing bool) error { - +func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { for _, ecNode := range existingLocations { shardBits := findEcVolumeShards(ecNode, vid) @@ -654,7 +659,7 @@ func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNo fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) - err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, rp, applyBalancing) + err := ecb.pickOneEcNodeAndMoveOneShard(averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes) if err != nil { return err } @@ -666,19 +671,17 @@ func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNo return nil } -func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error { - +func (ecb *ecBalancer) balanceEcRacks() error { // balance one rack for all ec shards - for _, ecRack := range racks { - if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil { + for _, ecRack := range ecb.racks() { + if err := ecb.doBalanceEcRack(ecRack); err != nil { return err } } return nil } -func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error { - +func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if len(ecRack.ecNodes) <= 1 { return nil } @@ -729,7 +732,7 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) - err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing) if err != nil { return err } @@ -749,7 +752,7 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool return nil } -func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, replicaPlacement *super_block.ReplicaPlacement, averageShardsPerEcNode int) (*EcNode, error) { +func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode, averageShardsPerEcNode int) (*EcNode, error) { if existingLocation == nil { return nil, fmt.Errorf("INTERNAL: missing source nodes") } @@ -781,8 +784,8 @@ func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode } shards := nodeShards[node] - if replicaPlacement != nil && shards >= replicaPlacement.SameRackCount { - details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, replicaPlacement.SameRackCount) + if ecb.replicaPlacement != nil && shards >= ecb.replicaPlacement.SameRackCount { + details += fmt.Sprintf(" Skipped %s because shards %d >= replica placement limit for the rack (%d)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount) continue } if shards >= averageShardsPerEcNode { @@ -808,15 +811,15 @@ func pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode } // TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards. -func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, rp *super_block.ReplicaPlacement, applyBalancing bool) error { - destNode, err := pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, rp, averageShardsPerEcNode) +func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { + destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes, averageShardsPerEcNode) if err != nil { fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) return nil } fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) - return moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destNode, applyBalancing) + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing) } func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { @@ -859,9 +862,9 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ return picked } -func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needle.VolumeId][]*EcNode { +func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode { vidLocations := make(map[needle.VolumeId][]*EcNode) - for _, ecNode := range allEcNodes { + for _, ecNode := range ecb.ecNodes { diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] if !found { continue @@ -876,9 +879,9 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl return vidLocations } -// TODO: EC volumes have no topology replica placement info :( We need a better solution to resolve topology, and balancing, for those. -func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nodes []*EcNode, ecReplicaPlacement *super_block.ReplicaPlacement) (*super_block.ReplicaPlacement, error) { - for _, ecNode := range nodes { +// TODO: Unused, delete me. +func (ecb *ecBalancer) volumeIdToReplicaPlacement(vid needle.VolumeId) (*super_block.ReplicaPlacement, error) { + for _, ecNode := range ecb.ecNodes { for _, diskInfo := range ecNode.info.DiskInfos { for _, volumeInfo := range diskInfo.VolumeInfos { if needle.VolumeId(volumeInfo.Id) == vid { @@ -887,7 +890,7 @@ func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nod } for _, ecShardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(ecShardInfo.Id) == vid { - return ecReplicaPlacement, nil + return ecb.replicaPlacement, nil } } } @@ -910,14 +913,19 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots) } - racks := collectRacks(allEcNodes) + ecb := &ecBalancer{ + commandEnv: commandEnv, + ecNodes: allEcNodes, + replicaPlacement: ecReplicaPlacement, + applyBalancing: applyBalancing, + } + for _, c := range collections { - if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, ecReplicaPlacement, applyBalancing); err != nil { + if err = ecb.balanceEcVolumes(c); err != nil { return err } } - - if err := balanceEcRacks(commandEnv, racks, applyBalancing); err != nil { + if err := ecb.balanceEcRacks(); err != nil { return fmt.Errorf("balance ec racks: %v", err) } |
