diff options
Diffstat (limited to 'weed/worker/tasks/erasure_coding')
| -rw-r--r-- | weed/worker/tasks/erasure_coding/detection.go | 194 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec_task.go | 368 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/register.go | 5 |
3 files changed, 368 insertions, 199 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index ec632436f..cd74bed33 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -61,6 +61,8 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + glog.Infof("EC Detection: Volume %d meets all criteria, attempting to create task", metric.VolumeID) + // Generate task ID for ActiveTopology integration taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix()) @@ -79,11 +81,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Plan EC destinations if ActiveTopology is available if clusterInfo.ActiveTopology != nil { + glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID) multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig) if err != nil { glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) continue // Skip this volume if destination planning fails } + glog.Infof("EC Detection: Successfully planned %d destinations for volume %d", len(multiPlan.Plans), metric.VolumeID) // Calculate expected shard size for EC operation // Each data shard will be approximately volumeSize / dataShards @@ -100,23 +104,27 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } // Find all volume replica locations (server + disk) from topology + glog.Infof("EC Detection: Looking for replica locations for volume %d", metric.VolumeID) replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) if len(replicaLocations) == 0 { glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID) continue } + glog.Infof("EC Detection: Found %d replica locations for volume %d", len(replicaLocations), metric.VolumeID) // Find existing EC shards from previous failed attempts existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) // Combine volume replicas and existing EC shards for cleanup - var allSourceLocations []topology.TaskSourceLocation + var sources []topology.TaskSourceSpec // Add volume replicas (will free volume slots) for _, replica := range replicaLocations { - allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + sources = append(sources, topology.TaskSourceSpec{ ServerID: replica.ServerID, DiskID: replica.DiskID, + DataCenter: replica.DataCenter, + Rack: replica.Rack, CleanupType: topology.CleanupVolumeReplica, }) } @@ -131,9 +139,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI for _, shard := range existingECShards { key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID) if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas - allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + sources = append(sources, topology.TaskSourceSpec{ ServerID: shard.ServerID, DiskID: shard.DiskID, + DataCenter: shard.DataCenter, + Rack: shard.Rack, CleanupType: topology.CleanupECShards, }) duplicateCheck[key] = true @@ -141,17 +151,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)", - len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations)) - - // Convert TaskSourceLocation to TaskSourceSpec - sources := make([]topology.TaskSourceSpec, len(allSourceLocations)) - for i, srcLoc := range allSourceLocations { - sources[i] = topology.TaskSourceSpec{ - ServerID: srcLoc.ServerID, - DiskID: srcLoc.DiskID, - CleanupType: srcLoc.CleanupType, - } - } + len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources)) // Convert shard destinations to TaskDestinationSpec destinations := make([]topology.TaskDestinationSpec, len(shardDestinations)) @@ -180,27 +180,21 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", - taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans)) - - // Find all volume replicas from topology (for legacy worker compatibility) - var replicas []string - serverSet := make(map[string]struct{}) - for _, loc := range replicaLocations { - if _, found := serverSet[loc.ServerID]; !found { - replicas = append(replicas, loc.ServerID) - serverSet[loc.ServerID] = struct{}{} - } - } - glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) + taskID, metric.VolumeID, len(sources), len(multiPlan.Plans)) - // Create typed parameters with EC destination information and replicas + // Create unified sources and targets for EC task result.TypedParams = &worker_pb.TaskParams{ TaskId: taskID, // Link to ActiveTopology pending task VolumeId: metric.VolumeID, - Server: metric.Server, Collection: metric.Collection, VolumeSize: metric.Size, // Store original volume size for tracking changes - Replicas: replicas, // Include all volume replicas for deletion + + // Unified sources - all sources that will be processed/cleaned up + Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID), + + // Unified targets - all EC shard destinations + Targets: createECTargets(multiPlan), + TaskParams: &worker_pb.TaskParams_ErasureCodingParams{ ErasureCodingParams: createECTaskParams(multiPlan), }, @@ -213,6 +207,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI continue // Skip this volume if no topology available } + glog.Infof("EC Detection: Successfully created EC task for volume %d, adding to results", metric.VolumeID) results = append(results, result) } else { // Count debug reasons @@ -283,7 +278,8 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V // Get available disks for EC placement with effective capacity consideration (includes pending tasks) // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1 // For EC, we need at least 1 available volume slot on a disk to consider it for placement. - availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1) + // Note: We don't exclude the source server since the original volume will be deleted after EC conversion + availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1) if len(availableDisks) < erasure_coding.MinTotalDisks { return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks)) } @@ -306,7 +302,6 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V TargetDC: disk.DataCenter, ExpectedSize: expectedShardSize, // Set calculated EC shard size PlacementScore: calculateECScore(disk, sourceRack, sourceDC), - Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), } plans = append(plans, plan) @@ -340,32 +335,96 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V }, nil } -// createECTaskParams creates EC task parameters from the multi-destination plan -func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams { - var destinations []*worker_pb.ECDestination - - for _, plan := range multiPlan.Plans { - destination := &worker_pb.ECDestination{ - Node: plan.TargetNode, - DiskId: plan.TargetDisk, - Rack: plan.TargetRack, - DataCenter: plan.TargetDC, - PlacementScore: plan.PlacementScore, +// createECTargets creates unified TaskTarget structures from the multi-destination plan +// with proper shard ID assignment during planning phase +func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.TaskTarget { + var targets []*worker_pb.TaskTarget + numTargets := len(multiPlan.Plans) + + // Create shard assignment arrays for each target (round-robin distribution) + targetShards := make([][]uint32, numTargets) + for i := range targetShards { + targetShards[i] = make([]uint32, 0) + } + + // Distribute shards in round-robin fashion to spread both data and parity shards + // This ensures each target gets a mix of data shards (0-9) and parity shards (10-13) + for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ { + targetIndex := int(shardId) % numTargets + targetShards[targetIndex] = append(targetShards[targetIndex], shardId) + } + + // Create targets with assigned shard IDs + for i, plan := range multiPlan.Plans { + target := &worker_pb.TaskTarget{ + Node: plan.TargetNode, + DiskId: plan.TargetDisk, + Rack: plan.TargetRack, + DataCenter: plan.TargetDC, + ShardIds: targetShards[i], // Round-robin assigned shards + EstimatedSize: plan.ExpectedSize, + } + targets = append(targets, target) + + // Log shard assignment with data/parity classification + dataShards := make([]uint32, 0) + parityShards := make([]uint32, 0) + for _, shardId := range targetShards[i] { + if shardId < uint32(erasure_coding.DataShardsCount) { + dataShards = append(dataShards, shardId) + } else { + parityShards = append(parityShards, shardId) + } } - destinations = append(destinations, destination) + glog.V(2).Infof("EC planning: target %s assigned shards %v (data: %v, parity: %v)", + plan.TargetNode, targetShards[i], dataShards, parityShards) } - // Collect placement conflicts from all destinations - var placementConflicts []string - for _, plan := range multiPlan.Plans { - placementConflicts = append(placementConflicts, plan.Conflicts...) + glog.V(1).Infof("EC planning: distributed %d shards across %d targets using round-robin (data shards 0-%d, parity shards %d-%d)", + erasure_coding.TotalShardsCount, numTargets, + erasure_coding.DataShardsCount-1, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount-1) + return targets +} + +// convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource +func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource { + var protobufSources []*worker_pb.TaskSource + + for _, source := range sources { + pbSource := &worker_pb.TaskSource{ + Node: source.ServerID, + DiskId: source.DiskID, + DataCenter: source.DataCenter, + Rack: source.Rack, + } + + // Convert storage impact to estimated size + if source.EstimatedSize != nil { + pbSource.EstimatedSize = uint64(*source.EstimatedSize) + } + + // Set appropriate volume ID or shard IDs based on cleanup type + switch source.CleanupType { + case topology.CleanupVolumeReplica: + // This is a volume replica, use the actual volume ID + pbSource.VolumeId = volumeID + case topology.CleanupECShards: + // This is EC shards, also use the volume ID for consistency + pbSource.VolumeId = volumeID + // Note: ShardIds would need to be passed separately if we need specific shard info + } + + protobufSources = append(protobufSources, pbSource) } + return protobufSources +} + +// createECTaskParams creates clean EC task parameters (destinations now in unified targets) +func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams { return &worker_pb.ErasureCodingTaskParams{ - Destinations: destinations, - DataShards: erasure_coding.DataShardsCount, // Standard data shards - ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards - PlacementConflicts: placementConflicts, + DataShards: erasure_coding.DataShardsCount, // Standard data shards + ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards } } @@ -456,25 +515,19 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa score := 0.0 - // Prefer disks with available capacity + // Prefer disks with available capacity (primary factor) if disk.DiskInfo.MaxVolumeCount > 0 { utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) - score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity + score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity } - // Prefer different racks for better distribution - if disk.Rack != sourceRack { - score += 30.0 - } - - // Prefer different data centers for better distribution - if disk.DataCenter != sourceDC { - score += 20.0 - } - - // Consider current load + // Consider current load (secondary factor) score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load + // Note: We don't penalize placing shards on the same rack/DC as source + // since the original volume will be deleted after EC conversion. + // This allows for better network efficiency and storage utilization. + return score } @@ -492,19 +545,6 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool { return true } -// checkECPlacementConflicts checks for placement rule conflicts in EC operations -func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string { - var conflicts []string - - // For EC, being on the same rack as source is often acceptable - // but we note it as potential conflict for monitoring - if disk.Rack == sourceRack && disk.DataCenter == sourceDC { - conflicts = append(conflicts, "same_rack_as_source") - } - - return conflicts -} - // findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume // Uses O(1) indexed lookup for optimal performance on large clusters. func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index 97332f63f..18f192bc9 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -7,7 +7,6 @@ import ( "math" "os" "path/filepath" - "sort" "strings" "time" @@ -36,9 +35,9 @@ type ErasureCodingTask struct { // EC parameters dataShards int32 parityShards int32 - destinations []*worker_pb.ECDestination - shardAssignment map[string][]string // destination -> assigned shard types - replicas []string // volume replica servers for deletion + targets []*worker_pb.TaskTarget // Unified targets for EC shards + sources []*worker_pb.TaskSource // Unified sources for cleanup + shardAssignment map[string][]string // destination -> assigned shard types } // NewErasureCodingTask creates a new unified EC task instance @@ -67,18 +66,43 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP t.dataShards = ecParams.DataShards t.parityShards = ecParams.ParityShards t.workDir = ecParams.WorkingDir - t.destinations = ecParams.Destinations - t.replicas = params.Replicas // Get replicas from task parameters + t.targets = params.Targets // Get unified targets + t.sources = params.Sources // Get unified sources + // Log detailed task information t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, "server": t.server, "collection": t.collection, "data_shards": t.dataShards, "parity_shards": t.parityShards, - "destinations": len(t.destinations), + "total_shards": t.dataShards + t.parityShards, + "targets": len(t.targets), + "sources": len(t.sources), }).Info("Starting erasure coding task") + // Log detailed target server assignments + for i, target := range t.targets { + t.GetLogger().WithFields(map[string]interface{}{ + "target_index": i, + "server": target.Node, + "shard_ids": target.ShardIds, + "shard_count": len(target.ShardIds), + }).Info("Target server shard assignment") + } + + // Log source information + for i, source := range t.sources { + t.GetLogger().WithFields(map[string]interface{}{ + "source_index": i, + "server": source.Node, + "volume_id": source.VolumeId, + "disk_id": source.DiskId, + "rack": source.Rack, + "data_center": source.DataCenter, + }).Info("Source server information") + } + // Use the working directory from task parameters, or fall back to a default baseWorkDir := t.workDir @@ -112,14 +136,14 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP }() // Step 1: Mark volume readonly - t.ReportProgress(10.0) + t.ReportProgressWithStage(10.0, "Marking volume readonly") t.GetLogger().Info("Marking volume readonly") if err := t.markVolumeReadonly(); err != nil { return fmt.Errorf("failed to mark volume readonly: %v", err) } // Step 2: Copy volume files to worker - t.ReportProgress(25.0) + t.ReportProgressWithStage(25.0, "Copying volume files to worker") t.GetLogger().Info("Copying volume files to worker") localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) if err != nil { @@ -127,7 +151,7 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP } // Step 3: Generate EC shards locally - t.ReportProgress(40.0) + t.ReportProgressWithStage(40.0, "Generating EC shards locally") t.GetLogger().Info("Generating EC shards locally") shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir) if err != nil { @@ -135,27 +159,27 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP } // Step 4: Distribute shards to destinations - t.ReportProgress(60.0) + t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations") t.GetLogger().Info("Distributing EC shards to destinations") if err := t.distributeEcShards(shardFiles); err != nil { return fmt.Errorf("failed to distribute EC shards: %v", err) } // Step 5: Mount EC shards - t.ReportProgress(80.0) + t.ReportProgressWithStage(80.0, "Mounting EC shards") t.GetLogger().Info("Mounting EC shards") if err := t.mountEcShards(); err != nil { return fmt.Errorf("failed to mount EC shards: %v", err) } // Step 6: Delete original volume - t.ReportProgress(90.0) + t.ReportProgressWithStage(90.0, "Deleting original volume") t.GetLogger().Info("Deleting original volume") if err := t.deleteOriginalVolume(); err != nil { return fmt.Errorf("failed to delete original volume: %v", err) } - t.ReportProgress(100.0) + t.ReportProgressWithStage(100.0, "EC processing complete") glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed", t.volumeID, t.server, len(shardFiles)) @@ -177,8 +201,16 @@ func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error { return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId) } - if params.Server != t.server { - return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server) + // Validate that at least one source matches our server + found := false + for _, source := range params.Sources { + if source.Node == t.server { + found = true + break + } + } + if !found { + return fmt.Errorf("no source matches expected server %s", t.server) } if ecParams.DataShards < 1 { @@ -189,8 +221,8 @@ func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error { return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards) } - if len(ecParams.Destinations) < int(ecParams.DataShards+ecParams.ParityShards) { - return fmt.Errorf("insufficient destinations: got %d, need %d", len(ecParams.Destinations), ecParams.DataShards+ecParams.ParityShards) + if len(params.Targets) < int(ecParams.DataShards+ecParams.ParityShards) { + return fmt.Errorf("insufficient targets: got %d, need %d", len(params.Targets), ecParams.DataShards+ecParams.ParityShards) } return nil @@ -224,6 +256,12 @@ func (t *ErasureCodingTask) markVolumeReadonly() error { func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { localFiles := make(map[string]string) + t.GetLogger().WithFields(map[string]interface{}{ + "volume_id": t.volumeID, + "source": t.server, + "working_dir": workDir, + }).Info("Starting volume file copy from source server") + // Copy .dat file datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) if err := t.copyFileFromSource(".dat", datFile); err != nil { @@ -231,6 +269,16 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] } localFiles["dat"] = datFile + // Log .dat file size + if info, err := os.Stat(datFile); err == nil { + t.GetLogger().WithFields(map[string]interface{}{ + "file_type": ".dat", + "file_path": datFile, + "size_bytes": info.Size(), + "size_mb": float64(info.Size()) / (1024 * 1024), + }).Info("Volume data file copied successfully") + } + // Copy .idx file idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) if err := t.copyFileFromSource(".idx", idxFile); err != nil { @@ -238,6 +286,16 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string] } localFiles["idx"] = idxFile + // Log .idx file size + if info, err := os.Stat(idxFile); err == nil { + t.GetLogger().WithFields(map[string]interface{}{ + "file_type": ".idx", + "file_path": idxFile, + "size_bytes": info.Size(), + "size_mb": float64(info.Size()) / (1024 * 1024), + }).Info("Volume index file copied successfully") + } + return localFiles, nil } @@ -312,18 +370,38 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string return nil, fmt.Errorf("failed to generate .ecx file: %v", err) } - // Collect generated shard file paths + // Collect generated shard file paths and log details + var generatedShards []string + var totalShardSize int64 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { shardFile := fmt.Sprintf("%s.ec%02d", baseName, i) - if _, err := os.Stat(shardFile); err == nil { - shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile + if info, err := os.Stat(shardFile); err == nil { + shardKey := fmt.Sprintf("ec%02d", i) + shardFiles[shardKey] = shardFile + generatedShards = append(generatedShards, shardKey) + totalShardSize += info.Size() + + // Log individual shard details + t.GetLogger().WithFields(map[string]interface{}{ + "shard_id": i, + "shard_type": shardKey, + "file_path": shardFile, + "size_bytes": info.Size(), + "size_kb": float64(info.Size()) / 1024, + }).Info("EC shard generated") } } // Add metadata files ecxFile := baseName + ".ecx" - if _, err := os.Stat(ecxFile); err == nil { + if info, err := os.Stat(ecxFile); err == nil { shardFiles["ecx"] = ecxFile + t.GetLogger().WithFields(map[string]interface{}{ + "file_type": "ecx", + "file_path": ecxFile, + "size_bytes": info.Size(), + }).Info("EC index file generated") } // Generate .vif file (volume info) @@ -335,26 +413,67 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string glog.Warningf("Failed to create .vif file: %v", err) } else { shardFiles["vif"] = vifFile + if info, err := os.Stat(vifFile); err == nil { + t.GetLogger().WithFields(map[string]interface{}{ + "file_type": "vif", + "file_path": vifFile, + "size_bytes": info.Size(), + }).Info("Volume info file generated") + } } - glog.V(1).Infof("Generated %d EC files locally", len(shardFiles)) + // Log summary of generation + t.GetLogger().WithFields(map[string]interface{}{ + "total_files": len(shardFiles), + "ec_shards": len(generatedShards), + "generated_shards": generatedShards, + "total_shard_size_mb": float64(totalShardSize) / (1024 * 1024), + }).Info("EC shard generation completed") return shardFiles, nil } // distributeEcShards distributes locally generated EC shards to destination servers +// using pre-assigned shard IDs from planning phase func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error { - if len(t.destinations) == 0 { - return fmt.Errorf("no destinations specified for EC shard distribution") + if len(t.targets) == 0 { + return fmt.Errorf("no targets specified for EC shard distribution") } if len(shardFiles) == 0 { return fmt.Errorf("no shard files available for distribution") } - // Create shard assignment: assign specific shards to specific destinations - shardAssignment := t.createShardAssignment(shardFiles) + // Build shard assignment from pre-assigned target shard IDs (from planning phase) + shardAssignment := make(map[string][]string) + + for _, target := range t.targets { + if len(target.ShardIds) == 0 { + continue // Skip targets with no assigned shards + } + + var assignedShards []string + + // Convert shard IDs to shard file names (e.g., 0 → "ec00", 1 → "ec01") + for _, shardId := range target.ShardIds { + shardType := fmt.Sprintf("ec%02d", shardId) + assignedShards = append(assignedShards, shardType) + } + + // Add metadata files (.ecx, .vif) to targets that have shards + if len(assignedShards) > 0 { + if _, hasEcx := shardFiles["ecx"]; hasEcx { + assignedShards = append(assignedShards, "ecx") + } + if _, hasVif := shardFiles["vif"]; hasVif { + assignedShards = append(assignedShards, "vif") + } + } + + shardAssignment[target.Node] = assignedShards + } + if len(shardAssignment) == 0 { - return fmt.Errorf("failed to create shard assignment") + return fmt.Errorf("no shard assignments found from planning phase") } // Store assignment for use during mounting @@ -365,100 +484,50 @@ func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) err t.GetLogger().WithFields(map[string]interface{}{ "destination": destNode, "assigned_shards": len(assignedShards), - "shard_ids": assignedShards, - }).Info("Distributing assigned EC shards to destination") + "shard_types": assignedShards, + }).Info("Starting shard distribution to destination server") // Send only the assigned shards to this destination + var transferredBytes int64 for _, shardType := range assignedShards { filePath, exists := shardFiles[shardType] if !exists { return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode) } + // Log file size before transfer + if info, err := os.Stat(filePath); err == nil { + transferredBytes += info.Size() + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "shard_type": shardType, + "file_path": filePath, + "size_bytes": info.Size(), + "size_kb": float64(info.Size()) / 1024, + }).Info("Starting shard file transfer") + } + if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil { return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err) } - } - } - glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment)) - return nil -} - -// createShardAssignment assigns specific EC shards to specific destination servers -// Each destination gets a subset of shards based on availability and placement rules -func (t *ErasureCodingTask) createShardAssignment(shardFiles map[string]string) map[string][]string { - assignment := make(map[string][]string) - - // Collect all available EC shards (ec00-ec13) - var availableShards []string - for shardType := range shardFiles { - if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { - availableShards = append(availableShards, shardType) - } - } - - // Sort shards for consistent assignment - sort.Strings(availableShards) - - if len(availableShards) == 0 { - glog.Warningf("No EC shards found for assignment") - return assignment - } - - // Calculate shards per destination - numDestinations := len(t.destinations) - if numDestinations == 0 { - return assignment - } - - // Strategy: Distribute shards as evenly as possible across destinations - // With 14 shards and N destinations, some destinations get ⌈14/N⌉ shards, others get ⌊14/N⌋ - shardsPerDest := len(availableShards) / numDestinations - extraShards := len(availableShards) % numDestinations - - shardIndex := 0 - for i, dest := range t.destinations { - var destShards []string - - // Assign base number of shards - shardsToAssign := shardsPerDest - - // Assign one extra shard to first 'extraShards' destinations - if i < extraShards { - shardsToAssign++ - } - - // Assign the shards - for j := 0; j < shardsToAssign && shardIndex < len(availableShards); j++ { - destShards = append(destShards, availableShards[shardIndex]) - shardIndex++ + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "shard_type": shardType, + }).Info("Shard file transfer completed") } - assignment[dest.Node] = destShards - - glog.V(2).Infof("Assigned shards %v to destination %s", destShards, dest.Node) - } - - // Assign metadata files (.ecx, .vif) to each destination that has shards - // Note: .ecj files are created during mount, not during initial generation - for destNode, destShards := range assignment { - if len(destShards) > 0 { - // Add .ecx file if available - if _, hasEcx := shardFiles["ecx"]; hasEcx { - assignment[destNode] = append(assignment[destNode], "ecx") - } - - // Add .vif file if available - if _, hasVif := shardFiles["vif"]; hasVif { - assignment[destNode] = append(assignment[destNode], "vif") - } - - glog.V(2).Infof("Assigned metadata files (.ecx, .vif) to destination %s", destNode) - } + // Log summary for this destination + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "shards_transferred": len(assignedShards), + "total_bytes": transferredBytes, + "total_mb": float64(transferredBytes) / (1024 * 1024), + }).Info("All shards distributed to destination server") } - return assignment + glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment)) + return nil } // sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API @@ -565,6 +634,8 @@ func (t *ErasureCodingTask) mountEcShards() error { for destNode, assignedShards := range t.shardAssignment { // Convert shard names to shard IDs for mounting var shardIds []uint32 + var metadataFiles []string + for _, shardType := range assignedShards { // Skip metadata files (.ecx, .vif) - only mount EC shards if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { @@ -573,16 +644,26 @@ func (t *ErasureCodingTask) mountEcShards() error { if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil { shardIds = append(shardIds, shardId) } + } else { + metadataFiles = append(metadataFiles, shardType) } } + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "shard_ids": shardIds, + "shard_count": len(shardIds), + "metadata_files": metadataFiles, + }).Info("Starting EC shard mount operation") + if len(shardIds) == 0 { - glog.V(1).Infof("No EC shards to mount on %s (only metadata files)", destNode) + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "metadata_files": metadataFiles, + }).Info("No EC shards to mount (only metadata files)") continue } - glog.V(1).Infof("Mounting shards %v on %s", shardIds, destNode) - err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(), func(client volume_server_pb.VolumeServerClient) error { _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ @@ -594,9 +675,18 @@ func (t *ErasureCodingTask) mountEcShards() error { }) if err != nil { - glog.Warningf("Failed to mount shards %v on %s: %v", shardIds, destNode, err) + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "shard_ids": shardIds, + "error": err.Error(), + }).Error("Failed to mount EC shards") } else { - glog.V(1).Infof("Successfully mounted EC shards %v on %s", shardIds, destNode) + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "shard_ids": shardIds, + "volume_id": t.volumeID, + "collection": t.collection, + }).Info("Successfully mounted EC shards") } } @@ -613,13 +703,24 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error { replicas = []string{t.server} } - glog.V(1).Infof("Deleting volume %d from %d replica servers: %v", t.volumeID, len(replicas), replicas) + t.GetLogger().WithFields(map[string]interface{}{ + "volume_id": t.volumeID, + "replica_count": len(replicas), + "replica_servers": replicas, + }).Info("Starting original volume deletion from replica servers") // Delete volume from all replica locations var deleteErrors []string successCount := 0 - for _, replicaServer := range replicas { + for i, replicaServer := range replicas { + t.GetLogger().WithFields(map[string]interface{}{ + "replica_index": i + 1, + "total_replicas": len(replicas), + "server": replicaServer, + "volume_id": t.volumeID, + }).Info("Deleting volume from replica server") + err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(), func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ @@ -631,27 +732,52 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error { if err != nil { deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err)) - glog.Warningf("Failed to delete volume %d from replica server %s: %v", t.volumeID, replicaServer, err) + t.GetLogger().WithFields(map[string]interface{}{ + "server": replicaServer, + "volume_id": t.volumeID, + "error": err.Error(), + }).Error("Failed to delete volume from replica server") } else { successCount++ - glog.V(1).Infof("Successfully deleted volume %d from replica server %s", t.volumeID, replicaServer) + t.GetLogger().WithFields(map[string]interface{}{ + "server": replicaServer, + "volume_id": t.volumeID, + }).Info("Successfully deleted volume from replica server") } } // Report results if len(deleteErrors) > 0 { - glog.Warningf("Some volume deletions failed (%d/%d successful): %v", successCount, len(replicas), deleteErrors) + t.GetLogger().WithFields(map[string]interface{}{ + "volume_id": t.volumeID, + "successful": successCount, + "failed": len(deleteErrors), + "total_replicas": len(replicas), + "success_rate": float64(successCount) / float64(len(replicas)) * 100, + "errors": deleteErrors, + }).Warning("Some volume deletions failed") // Don't return error - EC task should still be considered successful if shards are mounted } else { - glog.V(1).Infof("Successfully deleted volume %d from all %d replica servers", t.volumeID, len(replicas)) + t.GetLogger().WithFields(map[string]interface{}{ + "volume_id": t.volumeID, + "replica_count": len(replicas), + "replica_servers": replicas, + }).Info("Successfully deleted volume from all replica servers") } return nil } -// getReplicas extracts replica servers from task parameters +// getReplicas extracts replica servers from unified sources func (t *ErasureCodingTask) getReplicas() []string { - // Access replicas from the parameters passed during Execute - // We'll need to store these during Execute - let me add a field to the task - return t.replicas + var replicas []string + for _, source := range t.sources { + // Only include volume replica sources (not EC shard sources) + // Assumption: VolumeId == 0 is considered invalid and should be excluded. + // If volume ID 0 is valid in some contexts, update this check accordingly. + if source.VolumeId > 0 { + replicas = append(replicas, source.Node) + } + } + return replicas } diff --git a/weed/worker/tasks/erasure_coding/register.go b/weed/worker/tasks/erasure_coding/register.go index 883aaf965..e574e0033 100644 --- a/weed/worker/tasks/erasure_coding/register.go +++ b/weed/worker/tasks/erasure_coding/register.go @@ -42,9 +42,12 @@ func RegisterErasureCodingTask() { if params == nil { return nil, fmt.Errorf("task parameters are required") } + if len(params.Sources) == 0 { + return nil, fmt.Errorf("at least one source is required for erasure coding task") + } return NewErasureCodingTask( fmt.Sprintf("erasure_coding-%d", params.VolumeId), - params.Server, + params.Sources[0].Node, // Use first source node params.VolumeId, params.Collection, ), nil |
