diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-02 12:30:15 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-02 12:30:15 -0800 |
| commit | 4f038820dc480a7374b928a17cc9121474ba4172 (patch) | |
| tree | d15e1dade3537fbd462e819403b3347433ff37bd /weed/shell/command_ec_common.go | |
| parent | ebb06a3908990c31dcfb4995a69682d836228179 (diff) | |
| download | seaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.tar.xz seaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.zip | |
Add disk-aware EC rebalancing (#7597)
* Add placement package for EC shard placement logic
- Consolidate EC shard placement algorithm for reuse across shell and worker tasks
- Support multi-pass selection: racks, then servers, then disks
- Include proper spread verification and scoring functions
- Comprehensive test coverage for various cluster topologies
* Make ec.balance disk-aware for multi-disk servers
- Add EcDisk struct to track individual disks on volume servers
- Update EcNode to maintain per-disk shard distribution
- Parse disk_id from EC shard information during topology collection
- Implement pickBestDiskOnNode() for selecting best disk per shard
- Add diskDistributionScore() for tie-breaking node selection
- Update all move operations to specify target disk in RPC calls
- Improves shard balance within multi-disk servers, not just across servers
* Use placement package in EC detection for consistent disk-level placement
- Replace custom EC disk selection logic with shared placement package
- Convert topology DiskInfo to placement.DiskCandidate format
- Use SelectDestinations() for multi-rack/server/disk spreading
- Convert placement results back to topology DiskInfo for task creation
- Ensures EC detection uses same placement logic as shell commands
* Make volume server evacuation disk-aware
- Use pickBestDiskOnNode() when selecting evacuation target disk
- Specify target disk in evacuation RPC requests
- Maintains balanced disk distribution during server evacuations
* Rename PlacementConfig to PlacementRequest for clarity
PlacementRequest better reflects that this is a request for placement
rather than a configuration object. This improves API semantics.
* Rename DefaultConfig to DefaultPlacementRequest
Aligns with the PlacementRequest type naming for consistency
* Address review comments from Gemini and CodeRabbit
Fix HIGH issues:
- Fix empty disk discovery: Now discovers all disks from VolumeInfos,
not just from EC shards. This ensures disks without EC shards are
still considered for placement.
- Fix EC shard count calculation in detection.go: Now correctly filters
by DiskId and sums actual shard counts using ShardBits.ShardIdCount()
instead of just counting EcShardInfo entries.
Fix MEDIUM issues:
- Add disk ID to evacuation log messages for consistency with other logging
- Remove unused serverToDisks variable in placement.go
- Fix comment that incorrectly said 'ascending' when sorting is 'descending'
* add ec tests
* Update ec-integration-tests.yml
* Update ec_integration_test.go
* Fix EC integration tests CI: build weed binary and update actions
- Add 'Build weed binary' step before running tests
- Update actions/setup-go from v4 to v6 (Node20 compatibility)
- Update actions/checkout from v2 to v4 (Node20 compatibility)
- Move working-directory to test step only
* Add disk-aware EC rebalancing integration tests
- Add TestDiskAwareECRebalancing test with multi-disk cluster setup
- Test EC encode with disk awareness (shows disk ID in output)
- Test EC balance with disk-level shard distribution
- Add helper functions for disk-level verification:
- startMultiDiskCluster: 3 servers x 4 disks each
- countShardsPerDisk: track shards per disk per server
- calculateDiskShardVariance: measure distribution balance
- Verify no single disk is overloaded with shards
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 190 |
1 files changed, 178 insertions, 12 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f059b4e74..f2cc581da 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -26,12 +26,25 @@ type DataCenterId string type EcNodeId string type RackId string +// EcDisk represents a single disk on a volume server +type EcDisk struct { + diskId uint32 + diskType string + freeEcSlots int + ecShardCount int // Total EC shards on this disk + // Map of volumeId -> shardBits for shards on this disk + ecShards map[needle.VolumeId]erasure_coding.ShardBits +} + type EcNode struct { info *master_pb.DataNodeInfo dc DataCenterId rack RackId freeEcSlot int + // disks maps diskId -> EcDisk for disk-level balancing + disks map[uint32]*EcDisk } + type CandidateEcNode struct { ecNode *EcNode shardCount int @@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol return collections } -func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") @@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info) // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId) if err != nil { return err } @@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, return err } - fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + if destDiskId > 0 { + fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId) + } else { + fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + } } @@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer *EcNode, shardIdsToCopy []uint32, - volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) { fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) @@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(existingLocation), + DiskId: destDiskId, }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr) @@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter } freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) - ecNodes = append(ecNodes, &EcNode{ + ecNode := &EcNode{ info: dn, dc: dc, rack: rack, freeEcSlot: int(freeEcSlots), - }) + disks: make(map[uint32]*EcDisk), + } + + // Build disk-level information from volumes and EC shards + // First, discover all unique disk IDs from VolumeInfos (includes empty disks) + allDiskIds := make(map[uint32]string) // diskId -> diskType + for diskType, diskInfo := range dn.DiskInfos { + if diskInfo == nil { + continue + } + // Get all disk IDs from volumes + for _, vi := range diskInfo.VolumeInfos { + allDiskIds[vi.DiskId] = diskType + } + // Also get disk IDs from EC shards + for _, ecShardInfo := range diskInfo.EcShardInfos { + allDiskIds[ecShardInfo.DiskId] = diskType + } + } + + // Group EC shards by disk_id + diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits) + for _, diskInfo := range dn.DiskInfos { + if diskInfo == nil { + continue + } + for _, ecShardInfo := range diskInfo.EcShardInfos { + diskId := ecShardInfo.DiskId + if diskShards[diskId] == nil { + diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits) + } + vid := needle.VolumeId(ecShardInfo.Id) + diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } + } + + // Create EcDisk for each discovered disk + diskCount := len(allDiskIds) + if diskCount == 0 { + diskCount = 1 + } + freePerDisk := int(freeEcSlots) / diskCount + + for diskId, diskType := range allDiskIds { + shards := diskShards[diskId] + if shards == nil { + shards = make(map[needle.VolumeId]erasure_coding.ShardBits) + } + totalShardCount := 0 + for _, shardBits := range shards { + totalShardCount += shardBits.ShardIdCount() + } + + ecNode.disks[diskId] = &EcDisk{ + diskId: diskId, + diskType: diskType, + freeEcSlots: freePerDisk, + ecShardCount: totalShardCount, + ecShards: shards, + } + } + + ecNodes = append(ecNodes, ecNode) totalFreeEcSlots += freeEcSlots }) return @@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { for _, shards := range fullDiskInfo.EcShardInfos { if _, found := emptyNodeIds[shards.Id]; !found { for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { + vid := needle.VolumeId(shards.Id) + destDiskId := pickBestDiskOnNode(emptyNode, vid) - fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + if destDiskId > 0 { + fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + } - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing) if err != nil { return err } @@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi if len(targets) == 0 { return nil, errors.New(details) } + + // When multiple nodes have the same shard count, prefer nodes with better disk distribution + // (i.e., nodes with more disks that have fewer shards of this volume) + if len(targets) > 1 { + slices.SortFunc(targets, func(a, b *EcNode) int { + aScore := diskDistributionScore(a, vid) + bScore := diskDistributionScore(b, vid) + return aScore - bScore // Lower score is better + }) + return targets[0], nil + } + return targets[rand.IntN(len(targets))], nil } +// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks +// Lower score is better (means more room for balanced distribution) +func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int { + if len(ecNode.disks) == 0 { + return 0 + } + + // Sum the existing shard count for this volume on each disk + // Lower total means more room for new shards + score := 0 + for _, disk := range ecNode.disks { + if shardBits, ok := disk.ecShards[vid]; ok { + score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily + } + score += disk.ecShardCount // Also consider total shards on disk + } + return score +} + +// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard +// It prefers disks with fewer shards and more free slots +func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 { + if len(ecNode.disks) == 0 { + return 0 // No disk info available, let the server decide + } + + var bestDiskId uint32 + bestScore := -1 + + for diskId, disk := range ecNode.disks { + if disk.freeEcSlots <= 0 { + continue + } + + // Check if this volume already has shards on this disk + existingShards := 0 + if shardBits, ok := disk.ecShards[vid]; ok { + existingShards = shardBits.ShardIdCount() + } + + // Score: prefer disks with fewer total shards and fewer shards of this volume + // Lower score is better + score := disk.ecShardCount*10 + existingShards*100 + + if bestScore == -1 || score < bestScore { + bestScore = score + bestDiskId = diskId + } + } + + return bestDiskId +} + +// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk +func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) { + node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations) + if err != nil { + return nil, 0, err + } + + diskId := pickBestDiskOnNode(node, vid) + return node, diskId, nil +} + func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { - destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) + destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) if err != nil { - fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) + fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error()) return nil } - fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) - return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing) + if destDiskId > 0 { + fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) + } + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing) } func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { |
