aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go126
1 files changed, 73 insertions, 53 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index f2cc581da..bce0141f2 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -182,7 +182,7 @@ func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Dura
}
-func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@@ -191,15 +191,15 @@ func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecN
}
// find out all volume servers with one slot left.
- ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
+ ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter, diskType)
sortEcNodesByFreeslotsDescending(ecNodes)
return
}
-func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
- return collectEcNodesForDC(commandEnv, "")
+func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+ return collectEcNodesForDC(commandEnv, "", diskType)
}
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
@@ -242,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
-func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool, diskType types.DiskType) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -280,8 +280,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
}
- destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
- existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
+ destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds, diskType)
+ existingLocation.deleteEcVolumeShards(vid, copiedShardIds, diskType)
return nil
@@ -421,13 +421,13 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0
}
-func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
+func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string, diskType types.DiskType) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
return
}
- freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
+ freeEcSlots := countFreeShardSlots(dn, diskType)
ecNode := &EcNode{
info: dn,
dc: dc,
@@ -439,17 +439,17 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
// Build disk-level information from volumes and EC shards
// First, discover all unique disk IDs from VolumeInfos (includes empty disks)
allDiskIds := make(map[uint32]string) // diskId -> diskType
- for diskType, diskInfo := range dn.DiskInfos {
+ for diskTypeKey, diskInfo := range dn.DiskInfos {
if diskInfo == nil {
continue
}
// Get all disk IDs from volumes
for _, vi := range diskInfo.VolumeInfos {
- allDiskIds[vi.DiskId] = diskType
+ allDiskIds[vi.DiskId] = diskTypeKey
}
// Also get disk IDs from EC shards
for _, ecShardInfo := range diskInfo.EcShardInfos {
- allDiskIds[ecShardInfo.DiskId] = diskType
+ allDiskIds[ecShardInfo.DiskId] = diskTypeKey
}
}
@@ -476,7 +476,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freePerDisk := int(freeEcSlots) / diskCount
- for diskId, diskType := range allDiskIds {
+ for diskId, diskTypeStr := range allDiskIds {
shards := diskShards[diskId]
if shards == nil {
shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
@@ -488,7 +488,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
ecNode.disks[diskId] = &EcDisk{
diskId: diskId,
- diskType: diskType,
+ diskType: diskTypeStr,
freeEcSlots: freePerDisk,
ecShardCount: totalShardCount,
ecShards: shards,
@@ -551,9 +551,9 @@ func ceilDivide(a, b int) int {
return (a / b) + r
}
-func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
+func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits {
- if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
@@ -564,10 +564,10 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
return 0
}
-func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
+func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode {
foundVolume := false
- diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ diskInfo, found := ecNode.info.DiskInfos[string(diskType)]
if found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
@@ -584,9 +584,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
}
} else {
diskInfo = &master_pb.DiskInfo{
- Type: string(types.HardDriveType),
+ Type: string(diskType),
}
- ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
+ ecNode.info.DiskInfos[string(diskType)] = diskInfo
}
if !foundVolume {
@@ -598,7 +598,7 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
- DiskType: string(types.HardDriveType),
+ DiskType: string(diskType),
})
ecNode.freeEcSlot -= len(shardIds)
}
@@ -606,9 +606,9 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
return ecNode
}
-func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
+func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode {
- if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
@@ -649,6 +649,7 @@ type ecBalancer struct {
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
maxParallelization int
+ diskType types.DiskType // target disk type for EC shards (default: HardDriveType)
}
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
@@ -705,7 +706,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
// Use MaxShardCount (32) to support custom EC ratios
shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount)
for _, ecNode := range locations {
- shardBits := findEcVolumeShards(ecNode, vid)
+ shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
for _, shardId := range shardBits.ShardIds() {
shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
}
@@ -728,7 +729,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
- ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
+ ecNode.deleteEcVolumeShards(vid, duplicatedShardIds, ecb.diskType)
}
}
return nil
@@ -748,9 +749,9 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
return ewg.Wait()
}
-func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
+func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int {
return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
- shardBits := findEcVolumeShards(ecNode, vid)
+ shardBits := findEcVolumeShards(ecNode, vid, diskType)
return string(ecNode.rack), shardBits.ShardIdCount()
})
}
@@ -759,7 +760,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
racks := ecb.racks()
// see the volume's shards are in how many racks, and how many in each rack
- rackToShardCount := countShardsByRack(vid, locations)
+ rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
// Calculate actual total shards for this volume (not hardcoded default)
var totalShardsForVolume int
@@ -779,7 +780,7 @@ func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needl
continue
}
possibleEcNodes := rackEcNodesWithVid[rackId]
- for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
+ for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack, ecb.diskType) {
ecShardsToMove[shardId] = ecNode
}
}
@@ -856,7 +857,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
- rackToShardCount := countShardsByRack(vid, locations)
+ rackToShardCount := countShardsByRack(vid, locations, ecb.diskType)
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
@@ -865,7 +866,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
var possibleDestinationEcNodes []*EcNode
for _, n := range racks[RackId(rackId)].ecNodes {
- if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
+ if _, found := n.info.DiskInfos[string(ecb.diskType)]; found {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
}
@@ -882,7 +883,7 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
for _, ecNode := range existingLocations {
- shardBits := findEcVolumeShards(ecNode, vid)
+ shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
for _, shardId := range shardBits.ShardIds() {
@@ -927,7 +928,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
}
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
- diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
return
}
@@ -955,17 +956,18 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
emptyNodeIds := make(map[uint32]bool)
- if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range emptyDiskInfo.EcShardInfos {
emptyNodeIds[shards.Id] = true
}
}
- if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
vid := needle.VolumeId(shards.Id)
- destDiskId := pickBestDiskOnNode(emptyNode, vid)
+ // For balancing, strictly require matching disk type
+ destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true)
if destDiskId > 0 {
fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
@@ -973,7 +975,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
}
- err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
+ err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType)
if err != nil {
return err
}
@@ -1003,7 +1005,7 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
nodeShards := map[*EcNode]int{}
for _, node := range possibleDestinations {
- nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
+ nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount()
}
targets := []*EcNode{}
@@ -1078,14 +1080,17 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
}
// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
-// It prefers disks with fewer shards and more free slots
-func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
+// It prefers disks of the specified type with fewer shards and more free slots
+// If strictDiskType is false, it will fall back to other disk types if no matching disk is found
+func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType, strictDiskType bool) uint32 {
if len(ecNode.disks) == 0 {
return 0 // No disk info available, let the server decide
}
var bestDiskId uint32
bestScore := -1
+ var fallbackDiskId uint32
+ fallbackScore := -1
for diskId, disk := range ecNode.disks {
if disk.freeEcSlots <= 0 {
@@ -1102,13 +1107,26 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
// Lower score is better
score := disk.ecShardCount*10 + existingShards*100
- if bestScore == -1 || score < bestScore {
- bestScore = score
- bestDiskId = diskId
+ if disk.diskType == string(diskType) {
+ // Matching disk type - this is preferred
+ if bestScore == -1 || score < bestScore {
+ bestScore = score
+ bestDiskId = diskId
+ }
+ } else if !strictDiskType {
+ // Non-matching disk type - use as fallback if allowed
+ if fallbackScore == -1 || score < fallbackScore {
+ fallbackScore = score
+ fallbackDiskId = diskId
+ }
}
}
- return bestDiskId
+ // Return matching disk type if found, otherwise fallback
+ if bestDiskId != 0 {
+ return bestDiskId
+ }
+ return fallbackDiskId
}
// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
@@ -1118,7 +1136,8 @@ func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId,
return nil, 0, err
}
- diskId := pickBestDiskOnNode(node, vid)
+ // For balancing, strictly require matching disk type
+ diskId := pickBestDiskOnNode(node, vid, ecb.diskType, true)
return node, diskId, nil
}
@@ -1134,14 +1153,14 @@ func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, co
} else {
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
}
- return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
+ return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing, ecb.diskType)
}
-func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
+func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, diskType types.DiskType) map[erasure_coding.ShardId]*EcNode {
picked := make(map[erasure_coding.ShardId]*EcNode)
var candidateEcNodes []*CandidateEcNode
for _, ecNode := range ecNodes {
- shardBits := findEcVolumeShards(ecNode, vid)
+ shardBits := findEcVolumeShards(ecNode, vid, diskType)
if shardBits.ShardIdCount() > 0 {
candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
ecNode: ecNode,
@@ -1155,13 +1174,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
for i := 0; i < n; i++ {
selectedEcNodeIndex := -1
for i, candidateEcNode := range candidateEcNodes {
- shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
+ shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType)
if shardBits > 0 {
selectedEcNodeIndex = i
for _, shardId := range shardBits.ShardIds() {
candidateEcNode.shardCount--
picked[shardId] = candidateEcNode.ecNode
- candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
+ candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType)
break
}
break
@@ -1180,7 +1199,7 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
vidLocations := make(map[needle.VolumeId][]*EcNode)
for _, ecNode := range ecb.ecNodes {
- diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ diskInfo, found := ecNode.info.DiskInfos[string(ecb.diskType)]
if !found {
continue
}
@@ -1194,9 +1213,9 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
}
-func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
+func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, diskType types.DiskType, maxParallelization int, applyBalancing bool) (err error) {
// collect all ec nodes
- allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
+ allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc, diskType)
if err != nil {
return err
}
@@ -1210,6 +1229,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
maxParallelization: maxParallelization,
+ diskType: diskType,
}
if len(collections) == 0 {