aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding/detection.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/erasure_coding/detection.go')
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go130
1 files changed, 72 insertions, 58 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index cd74bed33..c5568fe26 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -429,85 +430,100 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era
}
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
+// Uses the consolidated placement package for proper rack/server/disk spreading
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
if len(disks) == 0 {
return nil
}
- // Group disks by rack and DC for diversity
- rackGroups := make(map[string][]*topology.DiskInfo)
- for _, disk := range disks {
- rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
- rackGroups[rackKey] = append(rackGroups[rackKey], disk)
+ // Convert topology.DiskInfo to placement.DiskCandidate
+ candidates := diskInfosToCandidates(disks)
+ if len(candidates) == 0 {
+ return nil
}
- var selected []*topology.DiskInfo
- usedRacks := make(map[string]bool)
+ // Configure placement for EC shards
+ config := placement.PlacementRequest{
+ ShardsNeeded: shardsNeeded,
+ MaxShardsPerServer: 0, // No hard limit, but prefer spreading
+ MaxShardsPerRack: 0, // No hard limit, but prefer spreading
+ MaxTaskLoad: topology.MaxTaskLoadForECPlacement,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
- // First pass: select one disk from each rack for maximum diversity
- for rackKey, rackDisks := range rackGroups {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Use the shared placement algorithm
+ result, err := placement.SelectDestinations(candidates, config)
+ if err != nil {
+ glog.V(2).Infof("EC placement failed: %v", err)
+ return nil
+ }
- // Select best disk from this rack
- bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
- if bestDisk != nil {
- selected = append(selected, bestDisk)
- usedRacks[rackKey] = true
+ // Convert back to topology.DiskInfo
+ return candidatesToDiskInfos(result.SelectedDisks, disks)
+}
+
+// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice
+func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate {
+ var candidates []*placement.DiskCandidate
+ for _, disk := range disks {
+ if disk.DiskInfo == nil {
+ continue
}
- }
- // Second pass: if we need more disks, select from racks we've already used
- if len(selected) < shardsNeeded {
- for _, disk := range disks {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Calculate free slots (using default max if not set)
+ freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount)
+ if freeSlots < 0 {
+ freeSlots = 0
+ }
- // Skip if already selected
- alreadySelected := false
- for _, sel := range selected {
- if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
- alreadySelected = true
- break
+ // Calculate EC shard count for this specific disk
+ // EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts
+ ecShardCount := 0
+ if disk.DiskInfo.EcShardInfos != nil {
+ for _, shardInfo := range disk.DiskInfo.EcShardInfos {
+ if shardInfo.DiskId == disk.DiskID {
+ ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount()
}
}
-
- if !alreadySelected && isDiskSuitableForEC(disk) {
- selected = append(selected, disk)
- }
}
- }
- return selected
+ candidates = append(candidates, &placement.DiskCandidate{
+ NodeID: disk.NodeID,
+ DiskID: disk.DiskID,
+ DataCenter: disk.DataCenter,
+ Rack: disk.Rack,
+ VolumeCount: disk.DiskInfo.VolumeCount,
+ MaxVolumeCount: disk.DiskInfo.MaxVolumeCount,
+ ShardCount: ecShardCount,
+ FreeSlots: freeSlots,
+ LoadCount: disk.LoadCount,
+ })
+ }
+ return candidates
}
-// selectBestFromRack selects the best disk from a rack for EC placement
-func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
- if len(disks) == 0 {
- return nil
+// candidatesToDiskInfos converts placement results back to topology.DiskInfo
+func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo {
+ // Create a map for quick lookup
+ diskMap := make(map[string]*topology.DiskInfo)
+ for _, disk := range originalDisks {
+ key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
+ diskMap[key] = disk
}
- var bestDisk *topology.DiskInfo
- bestScore := -1.0
-
- for _, disk := range disks {
- if !isDiskSuitableForEC(disk) {
- continue
- }
-
- score := calculateECScore(disk, sourceRack, sourceDC)
- if score > bestScore {
- bestScore = score
- bestDisk = disk
+ var result []*topology.DiskInfo
+ for _, candidate := range candidates {
+ key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID)
+ if disk, ok := diskMap[key]; ok {
+ result = append(result, disk)
}
}
-
- return bestDisk
+ return result
}
// calculateECScore calculates placement score for EC operations
+// Used for logging and plan metadata
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
if disk.DiskInfo == nil {
return 0.0
@@ -524,14 +540,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
// Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
- // Note: We don't penalize placing shards on the same rack/DC as source
- // since the original volume will be deleted after EC conversion.
- // This allows for better network efficiency and storage utilization.
-
return score
}
// isDiskSuitableForEC checks if a disk is suitable for EC placement
+// Note: This is kept for backward compatibility but the placement package
+// handles filtering internally
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
if disk.DiskInfo == nil {
return false