aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding/detection.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-12-02 12:30:15 -0800
committerGitHub <noreply@github.com>2025-12-02 12:30:15 -0800
commit4f038820dc480a7374b928a17cc9121474ba4172 (patch)
treed15e1dade3537fbd462e819403b3347433ff37bd /weed/worker/tasks/erasure_coding/detection.go
parentebb06a3908990c31dcfb4995a69682d836228179 (diff)
downloadseaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.tar.xz
seaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.zip
Add disk-aware EC rebalancing (#7597)
* Add placement package for EC shard placement logic - Consolidate EC shard placement algorithm for reuse across shell and worker tasks - Support multi-pass selection: racks, then servers, then disks - Include proper spread verification and scoring functions - Comprehensive test coverage for various cluster topologies * Make ec.balance disk-aware for multi-disk servers - Add EcDisk struct to track individual disks on volume servers - Update EcNode to maintain per-disk shard distribution - Parse disk_id from EC shard information during topology collection - Implement pickBestDiskOnNode() for selecting best disk per shard - Add diskDistributionScore() for tie-breaking node selection - Update all move operations to specify target disk in RPC calls - Improves shard balance within multi-disk servers, not just across servers * Use placement package in EC detection for consistent disk-level placement - Replace custom EC disk selection logic with shared placement package - Convert topology DiskInfo to placement.DiskCandidate format - Use SelectDestinations() for multi-rack/server/disk spreading - Convert placement results back to topology DiskInfo for task creation - Ensures EC detection uses same placement logic as shell commands * Make volume server evacuation disk-aware - Use pickBestDiskOnNode() when selecting evacuation target disk - Specify target disk in evacuation RPC requests - Maintains balanced disk distribution during server evacuations * Rename PlacementConfig to PlacementRequest for clarity PlacementRequest better reflects that this is a request for placement rather than a configuration object. This improves API semantics. * Rename DefaultConfig to DefaultPlacementRequest Aligns with the PlacementRequest type naming for consistency * Address review comments from Gemini and CodeRabbit Fix HIGH issues: - Fix empty disk discovery: Now discovers all disks from VolumeInfos, not just from EC shards. This ensures disks without EC shards are still considered for placement. - Fix EC shard count calculation in detection.go: Now correctly filters by DiskId and sums actual shard counts using ShardBits.ShardIdCount() instead of just counting EcShardInfo entries. Fix MEDIUM issues: - Add disk ID to evacuation log messages for consistency with other logging - Remove unused serverToDisks variable in placement.go - Fix comment that incorrectly said 'ascending' when sorting is 'descending' * add ec tests * Update ec-integration-tests.yml * Update ec_integration_test.go * Fix EC integration tests CI: build weed binary and update actions - Add 'Build weed binary' step before running tests - Update actions/setup-go from v4 to v6 (Node20 compatibility) - Update actions/checkout from v2 to v4 (Node20 compatibility) - Move working-directory to test step only * Add disk-aware EC rebalancing integration tests - Add TestDiskAwareECRebalancing test with multi-disk cluster setup - Test EC encode with disk awareness (shows disk ID in output) - Test EC balance with disk-level shard distribution - Add helper functions for disk-level verification: - startMultiDiskCluster: 3 servers x 4 disks each - countShardsPerDisk: track shards per disk per server - calculateDiskShardVariance: measure distribution balance - Verify no single disk is overloaded with shards
Diffstat (limited to 'weed/worker/tasks/erasure_coding/detection.go')
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go130
1 files changed, 72 insertions, 58 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index cd74bed33..c5568fe26 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -429,85 +430,100 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era
}
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
+// Uses the consolidated placement package for proper rack/server/disk spreading
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
if len(disks) == 0 {
return nil
}
- // Group disks by rack and DC for diversity
- rackGroups := make(map[string][]*topology.DiskInfo)
- for _, disk := range disks {
- rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
- rackGroups[rackKey] = append(rackGroups[rackKey], disk)
+ // Convert topology.DiskInfo to placement.DiskCandidate
+ candidates := diskInfosToCandidates(disks)
+ if len(candidates) == 0 {
+ return nil
}
- var selected []*topology.DiskInfo
- usedRacks := make(map[string]bool)
+ // Configure placement for EC shards
+ config := placement.PlacementRequest{
+ ShardsNeeded: shardsNeeded,
+ MaxShardsPerServer: 0, // No hard limit, but prefer spreading
+ MaxShardsPerRack: 0, // No hard limit, but prefer spreading
+ MaxTaskLoad: topology.MaxTaskLoadForECPlacement,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
- // First pass: select one disk from each rack for maximum diversity
- for rackKey, rackDisks := range rackGroups {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Use the shared placement algorithm
+ result, err := placement.SelectDestinations(candidates, config)
+ if err != nil {
+ glog.V(2).Infof("EC placement failed: %v", err)
+ return nil
+ }
- // Select best disk from this rack
- bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
- if bestDisk != nil {
- selected = append(selected, bestDisk)
- usedRacks[rackKey] = true
+ // Convert back to topology.DiskInfo
+ return candidatesToDiskInfos(result.SelectedDisks, disks)
+}
+
+// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice
+func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate {
+ var candidates []*placement.DiskCandidate
+ for _, disk := range disks {
+ if disk.DiskInfo == nil {
+ continue
}
- }
- // Second pass: if we need more disks, select from racks we've already used
- if len(selected) < shardsNeeded {
- for _, disk := range disks {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Calculate free slots (using default max if not set)
+ freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount)
+ if freeSlots < 0 {
+ freeSlots = 0
+ }
- // Skip if already selected
- alreadySelected := false
- for _, sel := range selected {
- if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
- alreadySelected = true
- break
+ // Calculate EC shard count for this specific disk
+ // EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts
+ ecShardCount := 0
+ if disk.DiskInfo.EcShardInfos != nil {
+ for _, shardInfo := range disk.DiskInfo.EcShardInfos {
+ if shardInfo.DiskId == disk.DiskID {
+ ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount()
}
}
-
- if !alreadySelected && isDiskSuitableForEC(disk) {
- selected = append(selected, disk)
- }
}
- }
- return selected
+ candidates = append(candidates, &placement.DiskCandidate{
+ NodeID: disk.NodeID,
+ DiskID: disk.DiskID,
+ DataCenter: disk.DataCenter,
+ Rack: disk.Rack,
+ VolumeCount: disk.DiskInfo.VolumeCount,
+ MaxVolumeCount: disk.DiskInfo.MaxVolumeCount,
+ ShardCount: ecShardCount,
+ FreeSlots: freeSlots,
+ LoadCount: disk.LoadCount,
+ })
+ }
+ return candidates
}
-// selectBestFromRack selects the best disk from a rack for EC placement
-func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
- if len(disks) == 0 {
- return nil
+// candidatesToDiskInfos converts placement results back to topology.DiskInfo
+func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo {
+ // Create a map for quick lookup
+ diskMap := make(map[string]*topology.DiskInfo)
+ for _, disk := range originalDisks {
+ key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
+ diskMap[key] = disk
}
- var bestDisk *topology.DiskInfo
- bestScore := -1.0
-
- for _, disk := range disks {
- if !isDiskSuitableForEC(disk) {
- continue
- }
-
- score := calculateECScore(disk, sourceRack, sourceDC)
- if score > bestScore {
- bestScore = score
- bestDisk = disk
+ var result []*topology.DiskInfo
+ for _, candidate := range candidates {
+ key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID)
+ if disk, ok := diskMap[key]; ok {
+ result = append(result, disk)
}
}
-
- return bestDisk
+ return result
}
// calculateECScore calculates placement score for EC operations
+// Used for logging and plan metadata
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
if disk.DiskInfo == nil {
return 0.0
@@ -524,14 +540,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
// Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
- // Note: We don't penalize placing shards on the same rack/DC as source
- // since the original volume will be deleted after EC conversion.
- // This allows for better network efficiency and storage utilization.
-
return score
}
// isDiskSuitableForEC checks if a disk is suitable for EC placement
+// Note: This is kept for backward compatibility but the placement package
+// handles filtering internally
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
if disk.DiskInfo == nil {
return false