aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-08-02 02:22:41 -0700
committerchrislu <chris.lu@gmail.com>2025-08-02 02:22:41 -0700
commit396927d1ec11fb36f1741fdbf1da879c2438cdf3 (patch)
tree1a4750be6a42420aad9cd2be5846577430687466
parent3d4e8409a53cf8103c9b93e2fde13be8e8652a25 (diff)
downloadseaweedfs-396927d1ec11fb36f1741fdbf1da879c2438cdf3.tar.xz
seaweedfs-396927d1ec11fb36f1741fdbf1da879c2438cdf3.zip
use ec shard size
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go72
1 files changed, 59 insertions, 13 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index 1122d2721..b40298f81 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -61,15 +61,16 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ expectedShardSize := calculateExpectedShardSize(metric.Size)
result := &types.TaskDetectionResult{
TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Priority: types.TaskPriorityLow, // EC is not urgent
- Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
+ Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB), expected shard size=%.1fMB",
metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
- float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB),
+ float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB, float64(expectedShardSize)/(1024*1024)),
ScheduleAt: now,
}
@@ -97,8 +98,8 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
},
}
- glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs",
- metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
+ glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs, expected shard size: %.2fMB",
+ metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs, float64(expectedShardSize)/(1024*1024))
} else {
glog.Warningf("No ActiveTopology available for destination planning in EC detection")
continue // Skip this volume if no topology available
@@ -143,6 +144,10 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// planECDestinations plans the destinations for erasure coding operation
// This function implements EC destination planning logic directly in the detection phase
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
+ // Calculate expected shard size based on original volume size
+ // EC data is split across data shards, so each shard gets roughly volume_size / data_shards
+ expectedShardSize := calculateExpectedShardSize(metric.Size)
+
// Get source node information from topology
var sourceRack, sourceDC string
@@ -178,9 +183,9 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
}
// Select best disks for EC placement with rack/DC diversity
- selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount)
- if len(selectedDisks) < minTotalDisks {
- return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks)
+ selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount, expectedShardSize)
+ if len(selectedDisks) < erasure_coding.MinTotalDisks {
+ return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks)
}
var plans []*topology.DestinationPlan
@@ -193,8 +198,8 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
TargetDisk: disk.DiskID,
TargetRack: disk.Rack,
TargetDC: disk.DataCenter,
- ExpectedSize: 0, // EC shards don't have predetermined size
- PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
+ ExpectedSize: expectedShardSize, // Use calculated shard size
+ PlacementScore: calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize),
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
}
plans = append(plans, plan)
@@ -205,6 +210,9 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
dcCount[disk.DataCenter]++
}
+ glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs",
+ metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount))
+
return &topology.MultiDestinationPlan{
Plans: plans,
TotalShards: len(plans),
@@ -243,7 +251,7 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era
}
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
-func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
+func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int, expectedShardSize uint64) []*topology.DiskInfo {
if len(disks) == 0 {
return nil
}
@@ -265,7 +273,7 @@ func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC s
}
// Select best disk from this rack
- bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
+ bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC, expectedShardSize)
if bestDisk != nil {
selected = append(selected, bestDisk)
usedRacks[rackKey] = true
@@ -298,7 +306,7 @@ func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC s
}
// selectBestFromRack selects the best disk from a rack for EC placement
-func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
+func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) *topology.DiskInfo {
if len(disks) == 0 {
return nil
}
@@ -311,7 +319,7 @@ func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string)
continue
}
- score := calculateECScore(disk, sourceRack, sourceDC)
+ score := calculateECScoreWithSize(disk, sourceRack, sourceDC, expectedShardSize)
if score > bestScore {
bestScore = score
bestDisk = disk
@@ -351,6 +359,24 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
return score
}
+// calculateECScoreWithSize calculates placement score for EC operations with shard size consideration
+func calculateECScoreWithSize(disk *topology.DiskInfo, sourceRack, sourceDC string, expectedShardSize uint64) float64 {
+ baseScore := calculateECScore(disk, sourceRack, sourceDC)
+
+ // Additional scoring based on available space vs expected shard size
+ if disk.DiskInfo != nil && expectedShardSize > 0 {
+ // Estimate available space (this is a rough estimate)
+ availableSlots := disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount
+ if availableSlots > 0 {
+ // Bonus for having plenty of space for the expected shard
+ // This is a heuristic - each volume slot can theoretically hold any size
+ baseScore += float64(availableSlots) * 2.0 // Up to 2 points per available slot
+ }
+ }
+
+ return baseScore
+}
+
// isDiskSuitableForEC checks if a disk is suitable for EC placement
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
if disk.DiskInfo == nil {
@@ -414,3 +440,23 @@ func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32
return replicaServers
}
+
+// calculateExpectedShardSize calculates the expected size of each EC shard based on the original volume size
+func calculateExpectedShardSize(originalVolumeSize uint64) uint64 {
+ if originalVolumeSize == 0 {
+ return 0
+ }
+
+ // In erasure coding, the original data is split across data shards
+ // Each data shard gets approximately originalSize / dataShards
+ // Parity shards are similar in size to data shards
+ // Add some overhead for padding and metadata (typically ~5-10%)
+ baseShardSize := originalVolumeSize / uint64(erasure_coding.DataShardsCount)
+ overhead := baseShardSize / 10 // 10% overhead for padding and metadata
+ expectedShardSize := baseShardSize + overhead
+
+ glog.V(2).Infof("Calculated expected shard size: original=%d bytes, base_shard=%d bytes, with_overhead=%d bytes",
+ originalVolumeSize, baseShardSize, expectedShardSize)
+
+ return expectedShardSize
+}