aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/erasure_coding')
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go194
-rw-r--r--weed/worker/tasks/erasure_coding/ec_task.go368
-rw-r--r--weed/worker/tasks/erasure_coding/register.go5
3 files changed, 368 insertions, 199 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index ec632436f..cd74bed33 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -61,6 +61,8 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ glog.Infof("EC Detection: Volume %d meets all criteria, attempting to create task", metric.VolumeID)
+
// Generate task ID for ActiveTopology integration
taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
@@ -79,11 +81,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Plan EC destinations if ActiveTopology is available
if clusterInfo.ActiveTopology != nil {
+ glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID)
multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
if err != nil {
glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
continue // Skip this volume if destination planning fails
}
+ glog.Infof("EC Detection: Successfully planned %d destinations for volume %d", len(multiPlan.Plans), metric.VolumeID)
// Calculate expected shard size for EC operation
// Each data shard will be approximately volumeSize / dataShards
@@ -100,23 +104,27 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
}
// Find all volume replica locations (server + disk) from topology
+ glog.Infof("EC Detection: Looking for replica locations for volume %d", metric.VolumeID)
replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
if len(replicaLocations) == 0 {
glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
continue
}
+ glog.Infof("EC Detection: Found %d replica locations for volume %d", len(replicaLocations), metric.VolumeID)
// Find existing EC shards from previous failed attempts
existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
// Combine volume replicas and existing EC shards for cleanup
- var allSourceLocations []topology.TaskSourceLocation
+ var sources []topology.TaskSourceSpec
// Add volume replicas (will free volume slots)
for _, replica := range replicaLocations {
- allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ sources = append(sources, topology.TaskSourceSpec{
ServerID: replica.ServerID,
DiskID: replica.DiskID,
+ DataCenter: replica.DataCenter,
+ Rack: replica.Rack,
CleanupType: topology.CleanupVolumeReplica,
})
}
@@ -131,9 +139,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
for _, shard := range existingECShards {
key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
- allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ sources = append(sources, topology.TaskSourceSpec{
ServerID: shard.ServerID,
DiskID: shard.DiskID,
+ DataCenter: shard.DataCenter,
+ Rack: shard.Rack,
CleanupType: topology.CleanupECShards,
})
duplicateCheck[key] = true
@@ -141,17 +151,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
}
glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
- len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations))
-
- // Convert TaskSourceLocation to TaskSourceSpec
- sources := make([]topology.TaskSourceSpec, len(allSourceLocations))
- for i, srcLoc := range allSourceLocations {
- sources[i] = topology.TaskSourceSpec{
- ServerID: srcLoc.ServerID,
- DiskID: srcLoc.DiskID,
- CleanupType: srcLoc.CleanupType,
- }
- }
+ len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources))
// Convert shard destinations to TaskDestinationSpec
destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
@@ -180,27 +180,21 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
}
glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
- taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans))
-
- // Find all volume replicas from topology (for legacy worker compatibility)
- var replicas []string
- serverSet := make(map[string]struct{})
- for _, loc := range replicaLocations {
- if _, found := serverSet[loc.ServerID]; !found {
- replicas = append(replicas, loc.ServerID)
- serverSet[loc.ServerID] = struct{}{}
- }
- }
- glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
+ taskID, metric.VolumeID, len(sources), len(multiPlan.Plans))
- // Create typed parameters with EC destination information and replicas
+ // Create unified sources and targets for EC task
result.TypedParams = &worker_pb.TaskParams{
TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: metric.VolumeID,
- Server: metric.Server,
Collection: metric.Collection,
VolumeSize: metric.Size, // Store original volume size for tracking changes
- Replicas: replicas, // Include all volume replicas for deletion
+
+ // Unified sources - all sources that will be processed/cleaned up
+ Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID),
+
+ // Unified targets - all EC shard destinations
+ Targets: createECTargets(multiPlan),
+
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: createECTaskParams(multiPlan),
},
@@ -213,6 +207,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
continue // Skip this volume if no topology available
}
+ glog.Infof("EC Detection: Successfully created EC task for volume %d, adding to results", metric.VolumeID)
results = append(results, result)
} else {
// Count debug reasons
@@ -283,7 +278,8 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
// Get available disks for EC placement with effective capacity consideration (includes pending tasks)
// For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
// For EC, we need at least 1 available volume slot on a disk to consider it for placement.
- availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1)
+ // Note: We don't exclude the source server since the original volume will be deleted after EC conversion
+ availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1)
if len(availableDisks) < erasure_coding.MinTotalDisks {
return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
}
@@ -306,7 +302,6 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
TargetDC: disk.DataCenter,
ExpectedSize: expectedShardSize, // Set calculated EC shard size
PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
- Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
}
plans = append(plans, plan)
@@ -340,32 +335,96 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
}, nil
}
-// createECTaskParams creates EC task parameters from the multi-destination plan
-func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
- var destinations []*worker_pb.ECDestination
-
- for _, plan := range multiPlan.Plans {
- destination := &worker_pb.ECDestination{
- Node: plan.TargetNode,
- DiskId: plan.TargetDisk,
- Rack: plan.TargetRack,
- DataCenter: plan.TargetDC,
- PlacementScore: plan.PlacementScore,
+// createECTargets creates unified TaskTarget structures from the multi-destination plan
+// with proper shard ID assignment during planning phase
+func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.TaskTarget {
+ var targets []*worker_pb.TaskTarget
+ numTargets := len(multiPlan.Plans)
+
+ // Create shard assignment arrays for each target (round-robin distribution)
+ targetShards := make([][]uint32, numTargets)
+ for i := range targetShards {
+ targetShards[i] = make([]uint32, 0)
+ }
+
+ // Distribute shards in round-robin fashion to spread both data and parity shards
+ // This ensures each target gets a mix of data shards (0-9) and parity shards (10-13)
+ for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ {
+ targetIndex := int(shardId) % numTargets
+ targetShards[targetIndex] = append(targetShards[targetIndex], shardId)
+ }
+
+ // Create targets with assigned shard IDs
+ for i, plan := range multiPlan.Plans {
+ target := &worker_pb.TaskTarget{
+ Node: plan.TargetNode,
+ DiskId: plan.TargetDisk,
+ Rack: plan.TargetRack,
+ DataCenter: plan.TargetDC,
+ ShardIds: targetShards[i], // Round-robin assigned shards
+ EstimatedSize: plan.ExpectedSize,
+ }
+ targets = append(targets, target)
+
+ // Log shard assignment with data/parity classification
+ dataShards := make([]uint32, 0)
+ parityShards := make([]uint32, 0)
+ for _, shardId := range targetShards[i] {
+ if shardId < uint32(erasure_coding.DataShardsCount) {
+ dataShards = append(dataShards, shardId)
+ } else {
+ parityShards = append(parityShards, shardId)
+ }
}
- destinations = append(destinations, destination)
+ glog.V(2).Infof("EC planning: target %s assigned shards %v (data: %v, parity: %v)",
+ plan.TargetNode, targetShards[i], dataShards, parityShards)
}
- // Collect placement conflicts from all destinations
- var placementConflicts []string
- for _, plan := range multiPlan.Plans {
- placementConflicts = append(placementConflicts, plan.Conflicts...)
+ glog.V(1).Infof("EC planning: distributed %d shards across %d targets using round-robin (data shards 0-%d, parity shards %d-%d)",
+ erasure_coding.TotalShardsCount, numTargets,
+ erasure_coding.DataShardsCount-1, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount-1)
+ return targets
+}
+
+// convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource
+func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource {
+ var protobufSources []*worker_pb.TaskSource
+
+ for _, source := range sources {
+ pbSource := &worker_pb.TaskSource{
+ Node: source.ServerID,
+ DiskId: source.DiskID,
+ DataCenter: source.DataCenter,
+ Rack: source.Rack,
+ }
+
+ // Convert storage impact to estimated size
+ if source.EstimatedSize != nil {
+ pbSource.EstimatedSize = uint64(*source.EstimatedSize)
+ }
+
+ // Set appropriate volume ID or shard IDs based on cleanup type
+ switch source.CleanupType {
+ case topology.CleanupVolumeReplica:
+ // This is a volume replica, use the actual volume ID
+ pbSource.VolumeId = volumeID
+ case topology.CleanupECShards:
+ // This is EC shards, also use the volume ID for consistency
+ pbSource.VolumeId = volumeID
+ // Note: ShardIds would need to be passed separately if we need specific shard info
+ }
+
+ protobufSources = append(protobufSources, pbSource)
}
+ return protobufSources
+}
+
+// createECTaskParams creates clean EC task parameters (destinations now in unified targets)
+func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
return &worker_pb.ErasureCodingTaskParams{
- Destinations: destinations,
- DataShards: erasure_coding.DataShardsCount, // Standard data shards
- ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
- PlacementConflicts: placementConflicts,
+ DataShards: erasure_coding.DataShardsCount, // Standard data shards
+ ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
}
}
@@ -456,25 +515,19 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
score := 0.0
- // Prefer disks with available capacity
+ // Prefer disks with available capacity (primary factor)
if disk.DiskInfo.MaxVolumeCount > 0 {
utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
- score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity
+ score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity
}
- // Prefer different racks for better distribution
- if disk.Rack != sourceRack {
- score += 30.0
- }
-
- // Prefer different data centers for better distribution
- if disk.DataCenter != sourceDC {
- score += 20.0
- }
-
- // Consider current load
+ // Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
+ // Note: We don't penalize placing shards on the same rack/DC as source
+ // since the original volume will be deleted after EC conversion.
+ // This allows for better network efficiency and storage utilization.
+
return score
}
@@ -492,19 +545,6 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
return true
}
-// checkECPlacementConflicts checks for placement rule conflicts in EC operations
-func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string {
- var conflicts []string
-
- // For EC, being on the same rack as source is often acceptable
- // but we note it as potential conflict for monitoring
- if disk.Rack == sourceRack && disk.DataCenter == sourceDC {
- conflicts = append(conflicts, "same_rack_as_source")
- }
-
- return conflicts
-}
-
// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
// Uses O(1) indexed lookup for optimal performance on large clusters.
func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go
index 97332f63f..18f192bc9 100644
--- a/weed/worker/tasks/erasure_coding/ec_task.go
+++ b/weed/worker/tasks/erasure_coding/ec_task.go
@@ -7,7 +7,6 @@ import (
"math"
"os"
"path/filepath"
- "sort"
"strings"
"time"
@@ -36,9 +35,9 @@ type ErasureCodingTask struct {
// EC parameters
dataShards int32
parityShards int32
- destinations []*worker_pb.ECDestination
- shardAssignment map[string][]string // destination -> assigned shard types
- replicas []string // volume replica servers for deletion
+ targets []*worker_pb.TaskTarget // Unified targets for EC shards
+ sources []*worker_pb.TaskSource // Unified sources for cleanup
+ shardAssignment map[string][]string // destination -> assigned shard types
}
// NewErasureCodingTask creates a new unified EC task instance
@@ -67,18 +66,43 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
t.dataShards = ecParams.DataShards
t.parityShards = ecParams.ParityShards
t.workDir = ecParams.WorkingDir
- t.destinations = ecParams.Destinations
- t.replicas = params.Replicas // Get replicas from task parameters
+ t.targets = params.Targets // Get unified targets
+ t.sources = params.Sources // Get unified sources
+ // Log detailed task information
t.GetLogger().WithFields(map[string]interface{}{
"volume_id": t.volumeID,
"server": t.server,
"collection": t.collection,
"data_shards": t.dataShards,
"parity_shards": t.parityShards,
- "destinations": len(t.destinations),
+ "total_shards": t.dataShards + t.parityShards,
+ "targets": len(t.targets),
+ "sources": len(t.sources),
}).Info("Starting erasure coding task")
+ // Log detailed target server assignments
+ for i, target := range t.targets {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "target_index": i,
+ "server": target.Node,
+ "shard_ids": target.ShardIds,
+ "shard_count": len(target.ShardIds),
+ }).Info("Target server shard assignment")
+ }
+
+ // Log source information
+ for i, source := range t.sources {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "source_index": i,
+ "server": source.Node,
+ "volume_id": source.VolumeId,
+ "disk_id": source.DiskId,
+ "rack": source.Rack,
+ "data_center": source.DataCenter,
+ }).Info("Source server information")
+ }
+
// Use the working directory from task parameters, or fall back to a default
baseWorkDir := t.workDir
@@ -112,14 +136,14 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
}()
// Step 1: Mark volume readonly
- t.ReportProgress(10.0)
+ t.ReportProgressWithStage(10.0, "Marking volume readonly")
t.GetLogger().Info("Marking volume readonly")
if err := t.markVolumeReadonly(); err != nil {
return fmt.Errorf("failed to mark volume readonly: %v", err)
}
// Step 2: Copy volume files to worker
- t.ReportProgress(25.0)
+ t.ReportProgressWithStage(25.0, "Copying volume files to worker")
t.GetLogger().Info("Copying volume files to worker")
localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
if err != nil {
@@ -127,7 +151,7 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
}
// Step 3: Generate EC shards locally
- t.ReportProgress(40.0)
+ t.ReportProgressWithStage(40.0, "Generating EC shards locally")
t.GetLogger().Info("Generating EC shards locally")
shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir)
if err != nil {
@@ -135,27 +159,27 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
}
// Step 4: Distribute shards to destinations
- t.ReportProgress(60.0)
+ t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations")
t.GetLogger().Info("Distributing EC shards to destinations")
if err := t.distributeEcShards(shardFiles); err != nil {
return fmt.Errorf("failed to distribute EC shards: %v", err)
}
// Step 5: Mount EC shards
- t.ReportProgress(80.0)
+ t.ReportProgressWithStage(80.0, "Mounting EC shards")
t.GetLogger().Info("Mounting EC shards")
if err := t.mountEcShards(); err != nil {
return fmt.Errorf("failed to mount EC shards: %v", err)
}
// Step 6: Delete original volume
- t.ReportProgress(90.0)
+ t.ReportProgressWithStage(90.0, "Deleting original volume")
t.GetLogger().Info("Deleting original volume")
if err := t.deleteOriginalVolume(); err != nil {
return fmt.Errorf("failed to delete original volume: %v", err)
}
- t.ReportProgress(100.0)
+ t.ReportProgressWithStage(100.0, "EC processing complete")
glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed",
t.volumeID, t.server, len(shardFiles))
@@ -177,8 +201,16 @@ func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
}
- if params.Server != t.server {
- return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server)
+ // Validate that at least one source matches our server
+ found := false
+ for _, source := range params.Sources {
+ if source.Node == t.server {
+ found = true
+ break
+ }
+ }
+ if !found {
+ return fmt.Errorf("no source matches expected server %s", t.server)
}
if ecParams.DataShards < 1 {
@@ -189,8 +221,8 @@ func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards)
}
- if len(ecParams.Destinations) < int(ecParams.DataShards+ecParams.ParityShards) {
- return fmt.Errorf("insufficient destinations: got %d, need %d", len(ecParams.Destinations), ecParams.DataShards+ecParams.ParityShards)
+ if len(params.Targets) < int(ecParams.DataShards+ecParams.ParityShards) {
+ return fmt.Errorf("insufficient targets: got %d, need %d", len(params.Targets), ecParams.DataShards+ecParams.ParityShards)
}
return nil
@@ -224,6 +256,12 @@ func (t *ErasureCodingTask) markVolumeReadonly() error {
func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
localFiles := make(map[string]string)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "volume_id": t.volumeID,
+ "source": t.server,
+ "working_dir": workDir,
+ }).Info("Starting volume file copy from source server")
+
// Copy .dat file
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
if err := t.copyFileFromSource(".dat", datFile); err != nil {
@@ -231,6 +269,16 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]
}
localFiles["dat"] = datFile
+ // Log .dat file size
+ if info, err := os.Stat(datFile); err == nil {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "file_type": ".dat",
+ "file_path": datFile,
+ "size_bytes": info.Size(),
+ "size_mb": float64(info.Size()) / (1024 * 1024),
+ }).Info("Volume data file copied successfully")
+ }
+
// Copy .idx file
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
if err := t.copyFileFromSource(".idx", idxFile); err != nil {
@@ -238,6 +286,16 @@ func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]
}
localFiles["idx"] = idxFile
+ // Log .idx file size
+ if info, err := os.Stat(idxFile); err == nil {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "file_type": ".idx",
+ "file_path": idxFile,
+ "size_bytes": info.Size(),
+ "size_mb": float64(info.Size()) / (1024 * 1024),
+ }).Info("Volume index file copied successfully")
+ }
+
return localFiles, nil
}
@@ -312,18 +370,38 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
}
- // Collect generated shard file paths
+ // Collect generated shard file paths and log details
+ var generatedShards []string
+ var totalShardSize int64
+
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
- if _, err := os.Stat(shardFile); err == nil {
- shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
+ if info, err := os.Stat(shardFile); err == nil {
+ shardKey := fmt.Sprintf("ec%02d", i)
+ shardFiles[shardKey] = shardFile
+ generatedShards = append(generatedShards, shardKey)
+ totalShardSize += info.Size()
+
+ // Log individual shard details
+ t.GetLogger().WithFields(map[string]interface{}{
+ "shard_id": i,
+ "shard_type": shardKey,
+ "file_path": shardFile,
+ "size_bytes": info.Size(),
+ "size_kb": float64(info.Size()) / 1024,
+ }).Info("EC shard generated")
}
}
// Add metadata files
ecxFile := baseName + ".ecx"
- if _, err := os.Stat(ecxFile); err == nil {
+ if info, err := os.Stat(ecxFile); err == nil {
shardFiles["ecx"] = ecxFile
+ t.GetLogger().WithFields(map[string]interface{}{
+ "file_type": "ecx",
+ "file_path": ecxFile,
+ "size_bytes": info.Size(),
+ }).Info("EC index file generated")
}
// Generate .vif file (volume info)
@@ -335,26 +413,67 @@ func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string
glog.Warningf("Failed to create .vif file: %v", err)
} else {
shardFiles["vif"] = vifFile
+ if info, err := os.Stat(vifFile); err == nil {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "file_type": "vif",
+ "file_path": vifFile,
+ "size_bytes": info.Size(),
+ }).Info("Volume info file generated")
+ }
}
- glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
+ // Log summary of generation
+ t.GetLogger().WithFields(map[string]interface{}{
+ "total_files": len(shardFiles),
+ "ec_shards": len(generatedShards),
+ "generated_shards": generatedShards,
+ "total_shard_size_mb": float64(totalShardSize) / (1024 * 1024),
+ }).Info("EC shard generation completed")
return shardFiles, nil
}
// distributeEcShards distributes locally generated EC shards to destination servers
+// using pre-assigned shard IDs from planning phase
func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error {
- if len(t.destinations) == 0 {
- return fmt.Errorf("no destinations specified for EC shard distribution")
+ if len(t.targets) == 0 {
+ return fmt.Errorf("no targets specified for EC shard distribution")
}
if len(shardFiles) == 0 {
return fmt.Errorf("no shard files available for distribution")
}
- // Create shard assignment: assign specific shards to specific destinations
- shardAssignment := t.createShardAssignment(shardFiles)
+ // Build shard assignment from pre-assigned target shard IDs (from planning phase)
+ shardAssignment := make(map[string][]string)
+
+ for _, target := range t.targets {
+ if len(target.ShardIds) == 0 {
+ continue // Skip targets with no assigned shards
+ }
+
+ var assignedShards []string
+
+ // Convert shard IDs to shard file names (e.g., 0 → "ec00", 1 → "ec01")
+ for _, shardId := range target.ShardIds {
+ shardType := fmt.Sprintf("ec%02d", shardId)
+ assignedShards = append(assignedShards, shardType)
+ }
+
+ // Add metadata files (.ecx, .vif) to targets that have shards
+ if len(assignedShards) > 0 {
+ if _, hasEcx := shardFiles["ecx"]; hasEcx {
+ assignedShards = append(assignedShards, "ecx")
+ }
+ if _, hasVif := shardFiles["vif"]; hasVif {
+ assignedShards = append(assignedShards, "vif")
+ }
+ }
+
+ shardAssignment[target.Node] = assignedShards
+ }
+
if len(shardAssignment) == 0 {
- return fmt.Errorf("failed to create shard assignment")
+ return fmt.Errorf("no shard assignments found from planning phase")
}
// Store assignment for use during mounting
@@ -365,100 +484,50 @@ func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) err
t.GetLogger().WithFields(map[string]interface{}{
"destination": destNode,
"assigned_shards": len(assignedShards),
- "shard_ids": assignedShards,
- }).Info("Distributing assigned EC shards to destination")
+ "shard_types": assignedShards,
+ }).Info("Starting shard distribution to destination server")
// Send only the assigned shards to this destination
+ var transferredBytes int64
for _, shardType := range assignedShards {
filePath, exists := shardFiles[shardType]
if !exists {
return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
}
+ // Log file size before transfer
+ if info, err := os.Stat(filePath); err == nil {
+ transferredBytes += info.Size()
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "shard_type": shardType,
+ "file_path": filePath,
+ "size_bytes": info.Size(),
+ "size_kb": float64(info.Size()) / 1024,
+ }).Info("Starting shard file transfer")
+ }
+
if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil {
return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err)
}
- }
- }
- glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
- return nil
-}
-
-// createShardAssignment assigns specific EC shards to specific destination servers
-// Each destination gets a subset of shards based on availability and placement rules
-func (t *ErasureCodingTask) createShardAssignment(shardFiles map[string]string) map[string][]string {
- assignment := make(map[string][]string)
-
- // Collect all available EC shards (ec00-ec13)
- var availableShards []string
- for shardType := range shardFiles {
- if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
- availableShards = append(availableShards, shardType)
- }
- }
-
- // Sort shards for consistent assignment
- sort.Strings(availableShards)
-
- if len(availableShards) == 0 {
- glog.Warningf("No EC shards found for assignment")
- return assignment
- }
-
- // Calculate shards per destination
- numDestinations := len(t.destinations)
- if numDestinations == 0 {
- return assignment
- }
-
- // Strategy: Distribute shards as evenly as possible across destinations
- // With 14 shards and N destinations, some destinations get ⌈14/N⌉ shards, others get ⌊14/N⌋
- shardsPerDest := len(availableShards) / numDestinations
- extraShards := len(availableShards) % numDestinations
-
- shardIndex := 0
- for i, dest := range t.destinations {
- var destShards []string
-
- // Assign base number of shards
- shardsToAssign := shardsPerDest
-
- // Assign one extra shard to first 'extraShards' destinations
- if i < extraShards {
- shardsToAssign++
- }
-
- // Assign the shards
- for j := 0; j < shardsToAssign && shardIndex < len(availableShards); j++ {
- destShards = append(destShards, availableShards[shardIndex])
- shardIndex++
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "shard_type": shardType,
+ }).Info("Shard file transfer completed")
}
- assignment[dest.Node] = destShards
-
- glog.V(2).Infof("Assigned shards %v to destination %s", destShards, dest.Node)
- }
-
- // Assign metadata files (.ecx, .vif) to each destination that has shards
- // Note: .ecj files are created during mount, not during initial generation
- for destNode, destShards := range assignment {
- if len(destShards) > 0 {
- // Add .ecx file if available
- if _, hasEcx := shardFiles["ecx"]; hasEcx {
- assignment[destNode] = append(assignment[destNode], "ecx")
- }
-
- // Add .vif file if available
- if _, hasVif := shardFiles["vif"]; hasVif {
- assignment[destNode] = append(assignment[destNode], "vif")
- }
-
- glog.V(2).Infof("Assigned metadata files (.ecx, .vif) to destination %s", destNode)
- }
+ // Log summary for this destination
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "shards_transferred": len(assignedShards),
+ "total_bytes": transferredBytes,
+ "total_mb": float64(transferredBytes) / (1024 * 1024),
+ }).Info("All shards distributed to destination server")
}
- return assignment
+ glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
+ return nil
}
// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API
@@ -565,6 +634,8 @@ func (t *ErasureCodingTask) mountEcShards() error {
for destNode, assignedShards := range t.shardAssignment {
// Convert shard names to shard IDs for mounting
var shardIds []uint32
+ var metadataFiles []string
+
for _, shardType := range assignedShards {
// Skip metadata files (.ecx, .vif) - only mount EC shards
if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
@@ -573,16 +644,26 @@ func (t *ErasureCodingTask) mountEcShards() error {
if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil {
shardIds = append(shardIds, shardId)
}
+ } else {
+ metadataFiles = append(metadataFiles, shardType)
}
}
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "shard_ids": shardIds,
+ "shard_count": len(shardIds),
+ "metadata_files": metadataFiles,
+ }).Info("Starting EC shard mount operation")
+
if len(shardIds) == 0 {
- glog.V(1).Infof("No EC shards to mount on %s (only metadata files)", destNode)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "metadata_files": metadataFiles,
+ }).Info("No EC shards to mount (only metadata files)")
continue
}
- glog.V(1).Infof("Mounting shards %v on %s", shardIds, destNode)
-
err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
@@ -594,9 +675,18 @@ func (t *ErasureCodingTask) mountEcShards() error {
})
if err != nil {
- glog.Warningf("Failed to mount shards %v on %s: %v", shardIds, destNode, err)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "shard_ids": shardIds,
+ "error": err.Error(),
+ }).Error("Failed to mount EC shards")
} else {
- glog.V(1).Infof("Successfully mounted EC shards %v on %s", shardIds, destNode)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "shard_ids": shardIds,
+ "volume_id": t.volumeID,
+ "collection": t.collection,
+ }).Info("Successfully mounted EC shards")
}
}
@@ -613,13 +703,24 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error {
replicas = []string{t.server}
}
- glog.V(1).Infof("Deleting volume %d from %d replica servers: %v", t.volumeID, len(replicas), replicas)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "volume_id": t.volumeID,
+ "replica_count": len(replicas),
+ "replica_servers": replicas,
+ }).Info("Starting original volume deletion from replica servers")
// Delete volume from all replica locations
var deleteErrors []string
successCount := 0
- for _, replicaServer := range replicas {
+ for i, replicaServer := range replicas {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "replica_index": i + 1,
+ "total_replicas": len(replicas),
+ "server": replicaServer,
+ "volume_id": t.volumeID,
+ }).Info("Deleting volume from replica server")
+
err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(),
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
@@ -631,27 +732,52 @@ func (t *ErasureCodingTask) deleteOriginalVolume() error {
if err != nil {
deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err))
- glog.Warningf("Failed to delete volume %d from replica server %s: %v", t.volumeID, replicaServer, err)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "server": replicaServer,
+ "volume_id": t.volumeID,
+ "error": err.Error(),
+ }).Error("Failed to delete volume from replica server")
} else {
successCount++
- glog.V(1).Infof("Successfully deleted volume %d from replica server %s", t.volumeID, replicaServer)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "server": replicaServer,
+ "volume_id": t.volumeID,
+ }).Info("Successfully deleted volume from replica server")
}
}
// Report results
if len(deleteErrors) > 0 {
- glog.Warningf("Some volume deletions failed (%d/%d successful): %v", successCount, len(replicas), deleteErrors)
+ t.GetLogger().WithFields(map[string]interface{}{
+ "volume_id": t.volumeID,
+ "successful": successCount,
+ "failed": len(deleteErrors),
+ "total_replicas": len(replicas),
+ "success_rate": float64(successCount) / float64(len(replicas)) * 100,
+ "errors": deleteErrors,
+ }).Warning("Some volume deletions failed")
// Don't return error - EC task should still be considered successful if shards are mounted
} else {
- glog.V(1).Infof("Successfully deleted volume %d from all %d replica servers", t.volumeID, len(replicas))
+ t.GetLogger().WithFields(map[string]interface{}{
+ "volume_id": t.volumeID,
+ "replica_count": len(replicas),
+ "replica_servers": replicas,
+ }).Info("Successfully deleted volume from all replica servers")
}
return nil
}
-// getReplicas extracts replica servers from task parameters
+// getReplicas extracts replica servers from unified sources
func (t *ErasureCodingTask) getReplicas() []string {
- // Access replicas from the parameters passed during Execute
- // We'll need to store these during Execute - let me add a field to the task
- return t.replicas
+ var replicas []string
+ for _, source := range t.sources {
+ // Only include volume replica sources (not EC shard sources)
+ // Assumption: VolumeId == 0 is considered invalid and should be excluded.
+ // If volume ID 0 is valid in some contexts, update this check accordingly.
+ if source.VolumeId > 0 {
+ replicas = append(replicas, source.Node)
+ }
+ }
+ return replicas
}
diff --git a/weed/worker/tasks/erasure_coding/register.go b/weed/worker/tasks/erasure_coding/register.go
index 883aaf965..e574e0033 100644
--- a/weed/worker/tasks/erasure_coding/register.go
+++ b/weed/worker/tasks/erasure_coding/register.go
@@ -42,9 +42,12 @@ func RegisterErasureCodingTask() {
if params == nil {
return nil, fmt.Errorf("task parameters are required")
}
+ if len(params.Sources) == 0 {
+ return nil, fmt.Errorf("at least one source is required for erasure coding task")
+ }
return NewErasureCodingTask(
fmt.Sprintf("erasure_coding-%d", params.VolumeId),
- params.Server,
+ params.Sources[0].Node, // Use first source node
params.VolumeId,
params.Collection,
), nil