aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-12-02 12:30:15 -0800
committerGitHub <noreply@github.com>2025-12-02 12:30:15 -0800
commit4f038820dc480a7374b928a17cc9121474ba4172 (patch)
treed15e1dade3537fbd462e819403b3347433ff37bd /weed/shell/command_ec_common.go
parentebb06a3908990c31dcfb4995a69682d836228179 (diff)
downloadseaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.tar.xz
seaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.zip
Add disk-aware EC rebalancing (#7597)
* Add placement package for EC shard placement logic - Consolidate EC shard placement algorithm for reuse across shell and worker tasks - Support multi-pass selection: racks, then servers, then disks - Include proper spread verification and scoring functions - Comprehensive test coverage for various cluster topologies * Make ec.balance disk-aware for multi-disk servers - Add EcDisk struct to track individual disks on volume servers - Update EcNode to maintain per-disk shard distribution - Parse disk_id from EC shard information during topology collection - Implement pickBestDiskOnNode() for selecting best disk per shard - Add diskDistributionScore() for tie-breaking node selection - Update all move operations to specify target disk in RPC calls - Improves shard balance within multi-disk servers, not just across servers * Use placement package in EC detection for consistent disk-level placement - Replace custom EC disk selection logic with shared placement package - Convert topology DiskInfo to placement.DiskCandidate format - Use SelectDestinations() for multi-rack/server/disk spreading - Convert placement results back to topology DiskInfo for task creation - Ensures EC detection uses same placement logic as shell commands * Make volume server evacuation disk-aware - Use pickBestDiskOnNode() when selecting evacuation target disk - Specify target disk in evacuation RPC requests - Maintains balanced disk distribution during server evacuations * Rename PlacementConfig to PlacementRequest for clarity PlacementRequest better reflects that this is a request for placement rather than a configuration object. This improves API semantics. * Rename DefaultConfig to DefaultPlacementRequest Aligns with the PlacementRequest type naming for consistency * Address review comments from Gemini and CodeRabbit Fix HIGH issues: - Fix empty disk discovery: Now discovers all disks from VolumeInfos, not just from EC shards. This ensures disks without EC shards are still considered for placement. - Fix EC shard count calculation in detection.go: Now correctly filters by DiskId and sums actual shard counts using ShardBits.ShardIdCount() instead of just counting EcShardInfo entries. Fix MEDIUM issues: - Add disk ID to evacuation log messages for consistency with other logging - Remove unused serverToDisks variable in placement.go - Fix comment that incorrectly said 'ascending' when sorting is 'descending' * add ec tests * Update ec-integration-tests.yml * Update ec_integration_test.go * Fix EC integration tests CI: build weed binary and update actions - Add 'Build weed binary' step before running tests - Update actions/setup-go from v4 to v6 (Node20 compatibility) - Update actions/checkout from v2 to v4 (Node20 compatibility) - Move working-directory to test step only * Add disk-aware EC rebalancing integration tests - Add TestDiskAwareECRebalancing test with multi-disk cluster setup - Test EC encode with disk awareness (shows disk ID in output) - Test EC balance with disk-level shard distribution - Add helper functions for disk-level verification: - startMultiDiskCluster: 3 servers x 4 disks each - countShardsPerDisk: track shards per disk per server - calculateDiskShardVariance: measure distribution balance - Verify no single disk is overloaded with shards
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go190
1 files changed, 178 insertions, 12 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index f059b4e74..f2cc581da 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -26,12 +26,25 @@ type DataCenterId string
type EcNodeId string
type RackId string
+// EcDisk represents a single disk on a volume server
+type EcDisk struct {
+ diskId uint32
+ diskType string
+ freeEcSlots int
+ ecShardCount int // Total EC shards on this disk
+ // Map of volumeId -> shardBits for shards on this disk
+ ecShards map[needle.VolumeId]erasure_coding.ShardBits
+}
+
type EcNode struct {
info *master_pb.DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
+ // disks maps diskId -> EcDisk for disk-level balancing
+ disks map[uint32]*EcDisk
}
+
type CandidateEcNode struct {
ecNode *EcNode
shardCount int
@@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
-func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId)
if err != nil {
return err
}
@@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
return err
}
- fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ }
}
@@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
- volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
+ volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
@@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(existingLocation),
+ DiskId: destDiskId,
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
@@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
- ecNodes = append(ecNodes, &EcNode{
+ ecNode := &EcNode{
info: dn,
dc: dc,
rack: rack,
freeEcSlot: int(freeEcSlots),
- })
+ disks: make(map[uint32]*EcDisk),
+ }
+
+ // Build disk-level information from volumes and EC shards
+ // First, discover all unique disk IDs from VolumeInfos (includes empty disks)
+ allDiskIds := make(map[uint32]string) // diskId -> diskType
+ for diskType, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ // Get all disk IDs from volumes
+ for _, vi := range diskInfo.VolumeInfos {
+ allDiskIds[vi.DiskId] = diskType
+ }
+ // Also get disk IDs from EC shards
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ allDiskIds[ecShardInfo.DiskId] = diskType
+ }
+ }
+
+ // Group EC shards by disk_id
+ diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits)
+ for _, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ diskId := ecShardInfo.DiskId
+ if diskShards[diskId] == nil {
+ diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ vid := needle.VolumeId(ecShardInfo.Id)
+ diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ }
+ }
+
+ // Create EcDisk for each discovered disk
+ diskCount := len(allDiskIds)
+ if diskCount == 0 {
+ diskCount = 1
+ }
+ freePerDisk := int(freeEcSlots) / diskCount
+
+ for diskId, diskType := range allDiskIds {
+ shards := diskShards[diskId]
+ if shards == nil {
+ shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ totalShardCount := 0
+ for _, shardBits := range shards {
+ totalShardCount += shardBits.ShardIdCount()
+ }
+
+ ecNode.disks[diskId] = &EcDisk{
+ diskId: diskId,
+ diskType: diskType,
+ freeEcSlots: freePerDisk,
+ ecShardCount: totalShardCount,
+ ecShards: shards,
+ }
+ }
+
+ ecNodes = append(ecNodes, ecNode)
totalFreeEcSlots += freeEcSlots
})
return
@@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
+ vid := needle.VolumeId(shards.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
- fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ }
- err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
+ err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
if err != nil {
return err
}
@@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
if len(targets) == 0 {
return nil, errors.New(details)
}
+
+ // When multiple nodes have the same shard count, prefer nodes with better disk distribution
+ // (i.e., nodes with more disks that have fewer shards of this volume)
+ if len(targets) > 1 {
+ slices.SortFunc(targets, func(a, b *EcNode) int {
+ aScore := diskDistributionScore(a, vid)
+ bScore := diskDistributionScore(b, vid)
+ return aScore - bScore // Lower score is better
+ })
+ return targets[0], nil
+ }
+
return targets[rand.IntN(len(targets))], nil
}
+// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks
+// Lower score is better (means more room for balanced distribution)
+func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
+ if len(ecNode.disks) == 0 {
+ return 0
+ }
+
+ // Sum the existing shard count for this volume on each disk
+ // Lower total means more room for new shards
+ score := 0
+ for _, disk := range ecNode.disks {
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily
+ }
+ score += disk.ecShardCount // Also consider total shards on disk
+ }
+ return score
+}
+
+// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
+// It prefers disks with fewer shards and more free slots
+func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
+ if len(ecNode.disks) == 0 {
+ return 0 // No disk info available, let the server decide
+ }
+
+ var bestDiskId uint32
+ bestScore := -1
+
+ for diskId, disk := range ecNode.disks {
+ if disk.freeEcSlots <= 0 {
+ continue
+ }
+
+ // Check if this volume already has shards on this disk
+ existingShards := 0
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ existingShards = shardBits.ShardIdCount()
+ }
+
+ // Score: prefer disks with fewer total shards and fewer shards of this volume
+ // Lower score is better
+ score := disk.ecShardCount*10 + existingShards*100
+
+ if bestScore == -1 || score < bestScore {
+ bestScore = score
+ bestDiskId = diskId
+ }
+ }
+
+ return bestDiskId
+}
+
+// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
+func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) {
+ node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ diskId := pickBestDiskOnNode(node, vid)
+ return node, diskId, nil
+}
+
func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
- destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
+ destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
if err != nil {
- fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
+ fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error())
return nil
}
- fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
- return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
+ }
+ return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {