aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go190
1 files changed, 178 insertions, 12 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index f059b4e74..f2cc581da 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -26,12 +26,25 @@ type DataCenterId string
type EcNodeId string
type RackId string
+// EcDisk represents a single disk on a volume server
+type EcDisk struct {
+ diskId uint32
+ diskType string
+ freeEcSlots int
+ ecShardCount int // Total EC shards on this disk
+ // Map of volumeId -> shardBits for shards on this disk
+ ecShards map[needle.VolumeId]erasure_coding.ShardBits
+}
+
type EcNode struct {
info *master_pb.DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
+ // disks maps diskId -> EcDisk for disk-level balancing
+ disks map[uint32]*EcDisk
}
+
type CandidateEcNode struct {
ecNode *EcNode
shardCount int
@@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
-func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId)
if err != nil {
return err
}
@@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
return err
}
- fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ }
}
@@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
- volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
+ volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
@@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(existingLocation),
+ DiskId: destDiskId,
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
@@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
- ecNodes = append(ecNodes, &EcNode{
+ ecNode := &EcNode{
info: dn,
dc: dc,
rack: rack,
freeEcSlot: int(freeEcSlots),
- })
+ disks: make(map[uint32]*EcDisk),
+ }
+
+ // Build disk-level information from volumes and EC shards
+ // First, discover all unique disk IDs from VolumeInfos (includes empty disks)
+ allDiskIds := make(map[uint32]string) // diskId -> diskType
+ for diskType, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ // Get all disk IDs from volumes
+ for _, vi := range diskInfo.VolumeInfos {
+ allDiskIds[vi.DiskId] = diskType
+ }
+ // Also get disk IDs from EC shards
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ allDiskIds[ecShardInfo.DiskId] = diskType
+ }
+ }
+
+ // Group EC shards by disk_id
+ diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits)
+ for _, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ diskId := ecShardInfo.DiskId
+ if diskShards[diskId] == nil {
+ diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ vid := needle.VolumeId(ecShardInfo.Id)
+ diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ }
+ }
+
+ // Create EcDisk for each discovered disk
+ diskCount := len(allDiskIds)
+ if diskCount == 0 {
+ diskCount = 1
+ }
+ freePerDisk := int(freeEcSlots) / diskCount
+
+ for diskId, diskType := range allDiskIds {
+ shards := diskShards[diskId]
+ if shards == nil {
+ shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ totalShardCount := 0
+ for _, shardBits := range shards {
+ totalShardCount += shardBits.ShardIdCount()
+ }
+
+ ecNode.disks[diskId] = &EcDisk{
+ diskId: diskId,
+ diskType: diskType,
+ freeEcSlots: freePerDisk,
+ ecShardCount: totalShardCount,
+ ecShards: shards,
+ }
+ }
+
+ ecNodes = append(ecNodes, ecNode)
totalFreeEcSlots += freeEcSlots
})
return
@@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
+ vid := needle.VolumeId(shards.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
- fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ }
- err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
+ err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
if err != nil {
return err
}
@@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
if len(targets) == 0 {
return nil, errors.New(details)
}
+
+ // When multiple nodes have the same shard count, prefer nodes with better disk distribution
+ // (i.e., nodes with more disks that have fewer shards of this volume)
+ if len(targets) > 1 {
+ slices.SortFunc(targets, func(a, b *EcNode) int {
+ aScore := diskDistributionScore(a, vid)
+ bScore := diskDistributionScore(b, vid)
+ return aScore - bScore // Lower score is better
+ })
+ return targets[0], nil
+ }
+
return targets[rand.IntN(len(targets))], nil
}
+// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks
+// Lower score is better (means more room for balanced distribution)
+func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
+ if len(ecNode.disks) == 0 {
+ return 0
+ }
+
+ // Sum the existing shard count for this volume on each disk
+ // Lower total means more room for new shards
+ score := 0
+ for _, disk := range ecNode.disks {
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily
+ }
+ score += disk.ecShardCount // Also consider total shards on disk
+ }
+ return score
+}
+
+// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
+// It prefers disks with fewer shards and more free slots
+func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
+ if len(ecNode.disks) == 0 {
+ return 0 // No disk info available, let the server decide
+ }
+
+ var bestDiskId uint32
+ bestScore := -1
+
+ for diskId, disk := range ecNode.disks {
+ if disk.freeEcSlots <= 0 {
+ continue
+ }
+
+ // Check if this volume already has shards on this disk
+ existingShards := 0
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ existingShards = shardBits.ShardIdCount()
+ }
+
+ // Score: prefer disks with fewer total shards and fewer shards of this volume
+ // Lower score is better
+ score := disk.ecShardCount*10 + existingShards*100
+
+ if bestScore == -1 || score < bestScore {
+ bestScore = score
+ bestDiskId = diskId
+ }
+ }
+
+ return bestDiskId
+}
+
+// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
+func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) {
+ node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ diskId := pickBestDiskOnNode(node, vid)
+ return node, diskId, nil
+}
+
func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
- destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
+ destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
if err != nil {
- fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
+ fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error())
return nil
}
- fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
- return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
+ }
+ return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {