diff options
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 126 |
1 files changed, 73 insertions, 53 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f2cc581da..bce0141f2 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura } -func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) @@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN } // find out all volume servers with one slot left. - ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter) + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType) sortEcNodesByFreeslotsDescending(ecNodes) return } -func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - return collectEcNodesForDC(commandEnv, "") +func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + return collectEcNodesForDC(commandEnv, "", diskType) } func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { @@ -242,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol return collections } -func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") @@ -280,8 +280,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, } - destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds) - existingLocation.deleteEcVolumeShards(vid, copiedShardIds) + destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType) + existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType) return nil @@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int { return 0 } -func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { +func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) { eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if selectedDataCenter != "" && selectedDataCenter != string(dc) { return } - freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) + freeEcSlots := countFreeShardSlots(dn, diskType) ecNode := &EcNode{ info: dn, dc: dc, @@ -439,17 +439,17 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter // Build disk-level information from volumes and EC shards // First, discover all unique disk IDs from VolumeInfos (includes empty disks) allDiskIds := make(map[uint32]string) // diskId -> diskType - for diskType, diskInfo := range dn.DiskInfos { + for diskTypeKey, diskInfo := range dn.DiskInfos { if diskInfo == nil { continue } // Get all disk IDs from volumes for _, vi := range diskInfo.VolumeInfos { - allDiskIds[vi.DiskId] = diskType + allDiskIds[vi.DiskId] = diskTypeKey } // Also get disk IDs from EC shards for _, ecShardInfo := range diskInfo.EcShardInfos { - allDiskIds[ecShardInfo.DiskId] = diskType + allDiskIds[ecShardInfo.DiskId] = diskTypeKey } } @@ -476,7 +476,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter } freePerDisk := int(freeEcSlots) / diskCount - for diskId, diskType := range allDiskIds { + for diskId, diskTypeStr := range allDiskIds { shards := diskShards[diskId] if shards == nil { shards = make(map[needle.VolumeId]erasure_coding.ShardBits) @@ -488,7 +488,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter ecNode.disks[diskId] = &EcDisk{ diskId: diskId, - diskType: diskType, + diskType: diskTypeStr, freeEcSlots: freePerDisk, ecShardCount: totalShardCount, ecShards: shards, @@ -551,9 +551,9 @@ func ceilDivide(a, b int) int { return (a / b) + r } -func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { +func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits { - if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { return erasure_coding.ShardBits(shardInfo.EcIndexBits) @@ -564,10 +564,10 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar return 0 } -func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode { +func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode { foundVolume := false - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(diskType)] if found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { @@ -584,9 +584,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, } } else { diskInfo = &master_pb.DiskInfo{ - Type: string(types.HardDriveType), + Type: string(diskType), } - ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo + ecNode.info.DiskInfos[string(diskType)] = diskInfo } if !foundVolume { @@ -598,7 +598,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, Id: uint32(vid), Collection: collection, EcIndexBits: uint32(newShardBits), - DiskType: string(types.HardDriveType), + DiskType: string(diskType), }) ecNode.freeEcSlot -= len(shardIds) } @@ -606,9 +606,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, return ecNode } -func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode { +func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode { - if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found { for _, shardInfo := range diskInfo.EcShardInfos { if needle.VolumeId(shardInfo.Id) == vid { oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) @@ -649,6 +649,7 @@ type ecBalancer struct { replicaPlacement *super_block.ReplicaPlacement applyBalancing bool maxParallelization int + diskType types.DiskType // target disk type for EC shards (default: HardDriveType) } func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup { @@ -705,7 +706,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum // Use MaxShardCount (32) to support custom EC ratios shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount) for _, ecNode := range locations { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) for _, shardId := range shardBits.ShardIds() { shardToLocations[shardId] = append(shardToLocations[shardId], ecNode) } @@ -728,7 +729,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } - ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) + ecNode.deleteEcVolumeShards(vid, duplicatedShardIds, ecb.diskType) } } return nil @@ -748,9 +749,9 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error { return ewg.Wait() } -func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int { +func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int { return groupByCount(locations, func(ecNode *EcNode) (id string, count int) { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, diskType) return string(ecNode.rack), shardBits.ShardIdCount() }) } @@ -759,7 +760,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl racks := ecb.racks() // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := countShardsByRack(vid, locations) + rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) // Calculate actual total shards for this volume (not hardcoded default) var totalShardsForVolume int @@ -779,7 +780,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl continue } possibleEcNodes := rackEcNodesWithVid[rackId] - for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { + for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) { ecShardsToMove[shardId] = ecNode } } @@ -856,7 +857,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { for vid, locations := range vidLocations { // see the volume's shards are in how many racks, and how many in each rack - rackToShardCount := countShardsByRack(vid, locations) + rackToShardCount := countShardsByRack(vid, locations, ecb.diskType) rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { return string(ecNode.rack) }) @@ -865,7 +866,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { var possibleDestinationEcNodes []*EcNode for _, n := range racks[RackId(rackId)].ecNodes { - if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found { + if _, found := n.info.DiskInfos[string(ecb.diskType)]; found { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } } @@ -882,7 +883,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error { func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error { for _, ecNode := range existingLocations { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType) overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode for _, shardId := range shardBits.ShardIds() { @@ -927,7 +928,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { } ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) { - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)] if !found { return } @@ -955,17 +956,18 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount { emptyNodeIds := make(map[uint32]bool) - if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found { + if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range emptyDiskInfo.EcShardInfos { emptyNodeIds[shards.Id] = true } } - if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found { + if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found { for _, shards := range fullDiskInfo.EcShardInfos { if _, found := emptyNodeIds[shards.Id]; !found { for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { vid := needle.VolumeId(shards.Id) - destDiskId := pickBestDiskOnNode(emptyNode, vid) + // For balancing, strictly require matching disk type + destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true) if destDiskId > 0 { fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) @@ -973,7 +975,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) } - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType) if err != nil { return err } @@ -1003,7 +1005,7 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi nodeShards := map[*EcNode]int{} for _, node := range possibleDestinations { - nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount() + nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount() } targets := []*EcNode{} @@ -1078,14 +1080,17 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int { } // pickBestDiskOnNode selects the best disk on a node for placing a new EC shard -// It prefers disks with fewer shards and more free slots -func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 { +// It prefers disks of the specified type with fewer shards and more free slots +// If strictDiskType is false, it will fall back to other disk types if no matching disk is found +func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool) uint32 { if len(ecNode.disks) == 0 { return 0 // No disk info available, let the server decide } var bestDiskId uint32 bestScore := -1 + var fallbackDiskId uint32 + fallbackScore := -1 for diskId, disk := range ecNode.disks { if disk.freeEcSlots <= 0 { @@ -1102,13 +1107,26 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 { // Lower score is better score := disk.ecShardCount*10 + existingShards*100 - if bestScore == -1 || score < bestScore { - bestScore = score - bestDiskId = diskId + if disk.diskType == string(diskType) { + // Matching disk type - this is preferred + if bestScore == -1 || score < bestScore { + bestScore = score + bestDiskId = diskId + } + } else if !strictDiskType { + // Non-matching disk type - use as fallback if allowed + if fallbackScore == -1 || score < fallbackScore { + fallbackScore = score + fallbackDiskId = diskId + } } } - return bestDiskId + // Return matching disk type if found, otherwise fallback + if bestDiskId != 0 { + return bestDiskId + } + return fallbackDiskId } // pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk @@ -1118,7 +1136,8 @@ func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, return nil, 0, err } - diskId := pickBestDiskOnNode(node, vid) + // For balancing, strictly require matching disk type + diskId := pickBestDiskOnNode(node, vid, ecb.diskType, true) return node, diskId, nil } @@ -1134,14 +1153,14 @@ func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, co } else { fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) } - return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing) + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing, ecb.diskType) } -func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { +func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, diskType types.DiskType) map[erasure_coding.ShardId]*EcNode { picked := make(map[erasure_coding.ShardId]*EcNode) var candidateEcNodes []*CandidateEcNode for _, ecNode := range ecNodes { - shardBits := findEcVolumeShards(ecNode, vid) + shardBits := findEcVolumeShards(ecNode, vid, diskType) if shardBits.ShardIdCount() > 0 { candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{ ecNode: ecNode, @@ -1155,13 +1174,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ for i := 0; i < n; i++ { selectedEcNodeIndex := -1 for i, candidateEcNode := range candidateEcNodes { - shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid) + shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType) if shardBits > 0 { selectedEcNodeIndex = i for _, shardId := range shardBits.ShardIds() { candidateEcNode.shardCount-- picked[shardId] = candidateEcNode.ecNode - candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}) + candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType) break } break @@ -1180,7 +1199,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode { vidLocations := make(map[needle.VolumeId][]*EcNode) for _, ecNode := range ecb.ecNodes { - diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)] if !found { continue } @@ -1194,9 +1213,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo return vidLocations } -func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) { +func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) { // collect all ec nodes - allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc) + allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType) if err != nil { return err } @@ -1210,6 +1229,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic replicaPlacement: ecReplicaPlacement, applyBalancing: applyBalancing, maxParallelization: maxParallelization, + diskType: diskType, } if len(collections) == 0 { |
