aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding/detection.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/erasure_coding/detection.go')
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go194
1 files changed, 117 insertions, 77 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index ec632436f..cd74bed33 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -61,6 +61,8 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ glog.Infof("EC Detection: Volume %d meets all criteria, attempting to create task", metric.VolumeID)
+
// Generate task ID for ActiveTopology integration
taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
@@ -79,11 +81,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Plan EC destinations if ActiveTopology is available
if clusterInfo.ActiveTopology != nil {
+ glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID)
multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
if err != nil {
glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
continue // Skip this volume if destination planning fails
}
+ glog.Infof("EC Detection: Successfully planned %d destinations for volume %d", len(multiPlan.Plans), metric.VolumeID)
// Calculate expected shard size for EC operation
// Each data shard will be approximately volumeSize / dataShards
@@ -100,23 +104,27 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
}
// Find all volume replica locations (server + disk) from topology
+ glog.Infof("EC Detection: Looking for replica locations for volume %d", metric.VolumeID)
replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
if len(replicaLocations) == 0 {
glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
continue
}
+ glog.Infof("EC Detection: Found %d replica locations for volume %d", len(replicaLocations), metric.VolumeID)
// Find existing EC shards from previous failed attempts
existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
// Combine volume replicas and existing EC shards for cleanup
- var allSourceLocations []topology.TaskSourceLocation
+ var sources []topology.TaskSourceSpec
// Add volume replicas (will free volume slots)
for _, replica := range replicaLocations {
- allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ sources = append(sources, topology.TaskSourceSpec{
ServerID: replica.ServerID,
DiskID: replica.DiskID,
+ DataCenter: replica.DataCenter,
+ Rack: replica.Rack,
CleanupType: topology.CleanupVolumeReplica,
})
}
@@ -131,9 +139,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
for _, shard := range existingECShards {
key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
- allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ sources = append(sources, topology.TaskSourceSpec{
ServerID: shard.ServerID,
DiskID: shard.DiskID,
+ DataCenter: shard.DataCenter,
+ Rack: shard.Rack,
CleanupType: topology.CleanupECShards,
})
duplicateCheck[key] = true
@@ -141,17 +151,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
}
glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
- len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations))
-
- // Convert TaskSourceLocation to TaskSourceSpec
- sources := make([]topology.TaskSourceSpec, len(allSourceLocations))
- for i, srcLoc := range allSourceLocations {
- sources[i] = topology.TaskSourceSpec{
- ServerID: srcLoc.ServerID,
- DiskID: srcLoc.DiskID,
- CleanupType: srcLoc.CleanupType,
- }
- }
+ len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources))
// Convert shard destinations to TaskDestinationSpec
destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
@@ -180,27 +180,21 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
}
glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
- taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans))
-
- // Find all volume replicas from topology (for legacy worker compatibility)
- var replicas []string
- serverSet := make(map[string]struct{})
- for _, loc := range replicaLocations {
- if _, found := serverSet[loc.ServerID]; !found {
- replicas = append(replicas, loc.ServerID)
- serverSet[loc.ServerID] = struct{}{}
- }
- }
- glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
+ taskID, metric.VolumeID, len(sources), len(multiPlan.Plans))
- // Create typed parameters with EC destination information and replicas
+ // Create unified sources and targets for EC task
result.TypedParams = &worker_pb.TaskParams{
TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: metric.VolumeID,
- Server: metric.Server,
Collection: metric.Collection,
VolumeSize: metric.Size, // Store original volume size for tracking changes
- Replicas: replicas, // Include all volume replicas for deletion
+
+ // Unified sources - all sources that will be processed/cleaned up
+ Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID),
+
+ // Unified targets - all EC shard destinations
+ Targets: createECTargets(multiPlan),
+
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: createECTaskParams(multiPlan),
},
@@ -213,6 +207,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
continue // Skip this volume if no topology available
}
+ glog.Infof("EC Detection: Successfully created EC task for volume %d, adding to results", metric.VolumeID)
results = append(results, result)
} else {
// Count debug reasons
@@ -283,7 +278,8 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
// Get available disks for EC placement with effective capacity consideration (includes pending tasks)
// For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
// For EC, we need at least 1 available volume slot on a disk to consider it for placement.
- availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1)
+ // Note: We don't exclude the source server since the original volume will be deleted after EC conversion
+ availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1)
if len(availableDisks) < erasure_coding.MinTotalDisks {
return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
}
@@ -306,7 +302,6 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
TargetDC: disk.DataCenter,
ExpectedSize: expectedShardSize, // Set calculated EC shard size
PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
- Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
}
plans = append(plans, plan)
@@ -340,32 +335,96 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
}, nil
}
-// createECTaskParams creates EC task parameters from the multi-destination plan
-func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
- var destinations []*worker_pb.ECDestination
-
- for _, plan := range multiPlan.Plans {
- destination := &worker_pb.ECDestination{
- Node: plan.TargetNode,
- DiskId: plan.TargetDisk,
- Rack: plan.TargetRack,
- DataCenter: plan.TargetDC,
- PlacementScore: plan.PlacementScore,
+// createECTargets creates unified TaskTarget structures from the multi-destination plan
+// with proper shard ID assignment during planning phase
+func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.TaskTarget {
+ var targets []*worker_pb.TaskTarget
+ numTargets := len(multiPlan.Plans)
+
+ // Create shard assignment arrays for each target (round-robin distribution)
+ targetShards := make([][]uint32, numTargets)
+ for i := range targetShards {
+ targetShards[i] = make([]uint32, 0)
+ }
+
+ // Distribute shards in round-robin fashion to spread both data and parity shards
+ // This ensures each target gets a mix of data shards (0-9) and parity shards (10-13)
+ for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ {
+ targetIndex := int(shardId) % numTargets
+ targetShards[targetIndex] = append(targetShards[targetIndex], shardId)
+ }
+
+ // Create targets with assigned shard IDs
+ for i, plan := range multiPlan.Plans {
+ target := &worker_pb.TaskTarget{
+ Node: plan.TargetNode,
+ DiskId: plan.TargetDisk,
+ Rack: plan.TargetRack,
+ DataCenter: plan.TargetDC,
+ ShardIds: targetShards[i], // Round-robin assigned shards
+ EstimatedSize: plan.ExpectedSize,
+ }
+ targets = append(targets, target)
+
+ // Log shard assignment with data/parity classification
+ dataShards := make([]uint32, 0)
+ parityShards := make([]uint32, 0)
+ for _, shardId := range targetShards[i] {
+ if shardId < uint32(erasure_coding.DataShardsCount) {
+ dataShards = append(dataShards, shardId)
+ } else {
+ parityShards = append(parityShards, shardId)
+ }
}
- destinations = append(destinations, destination)
+ glog.V(2).Infof("EC planning: target %s assigned shards %v (data: %v, parity: %v)",
+ plan.TargetNode, targetShards[i], dataShards, parityShards)
}
- // Collect placement conflicts from all destinations
- var placementConflicts []string
- for _, plan := range multiPlan.Plans {
- placementConflicts = append(placementConflicts, plan.Conflicts...)
+ glog.V(1).Infof("EC planning: distributed %d shards across %d targets using round-robin (data shards 0-%d, parity shards %d-%d)",
+ erasure_coding.TotalShardsCount, numTargets,
+ erasure_coding.DataShardsCount-1, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount-1)
+ return targets
+}
+
+// convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource
+func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource {
+ var protobufSources []*worker_pb.TaskSource
+
+ for _, source := range sources {
+ pbSource := &worker_pb.TaskSource{
+ Node: source.ServerID,
+ DiskId: source.DiskID,
+ DataCenter: source.DataCenter,
+ Rack: source.Rack,
+ }
+
+ // Convert storage impact to estimated size
+ if source.EstimatedSize != nil {
+ pbSource.EstimatedSize = uint64(*source.EstimatedSize)
+ }
+
+ // Set appropriate volume ID or shard IDs based on cleanup type
+ switch source.CleanupType {
+ case topology.CleanupVolumeReplica:
+ // This is a volume replica, use the actual volume ID
+ pbSource.VolumeId = volumeID
+ case topology.CleanupECShards:
+ // This is EC shards, also use the volume ID for consistency
+ pbSource.VolumeId = volumeID
+ // Note: ShardIds would need to be passed separately if we need specific shard info
+ }
+
+ protobufSources = append(protobufSources, pbSource)
}
+ return protobufSources
+}
+
+// createECTaskParams creates clean EC task parameters (destinations now in unified targets)
+func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
return &worker_pb.ErasureCodingTaskParams{
- Destinations: destinations,
- DataShards: erasure_coding.DataShardsCount, // Standard data shards
- ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
- PlacementConflicts: placementConflicts,
+ DataShards: erasure_coding.DataShardsCount, // Standard data shards
+ ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
}
}
@@ -456,25 +515,19 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
score := 0.0
- // Prefer disks with available capacity
+ // Prefer disks with available capacity (primary factor)
if disk.DiskInfo.MaxVolumeCount > 0 {
utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
- score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity
+ score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity
}
- // Prefer different racks for better distribution
- if disk.Rack != sourceRack {
- score += 30.0
- }
-
- // Prefer different data centers for better distribution
- if disk.DataCenter != sourceDC {
- score += 20.0
- }
-
- // Consider current load
+ // Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
+ // Note: We don't penalize placing shards on the same rack/DC as source
+ // since the original volume will be deleted after EC conversion.
+ // This allows for better network efficiency and storage utilization.
+
return score
}
@@ -492,19 +545,6 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
return true
}
-// checkECPlacementConflicts checks for placement rule conflicts in EC operations
-func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string {
- var conflicts []string
-
- // For EC, being on the same rack as source is often acceptable
- // but we note it as potential conflict for monitoring
- if disk.Rack == sourceRack && disk.DataCenter == sourceDC {
- conflicts = append(conflicts, "same_rack_as_source")
- }
-
- return conflicts
-}
-
// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
// Uses O(1) indexed lookup for optimal performance on large clusters.
func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {