diff options
Diffstat (limited to 'weed/worker/tasks/erasure_coding')
| -rw-r--r-- | weed/worker/tasks/erasure_coding/detection.go | 314 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec.go | 785 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec_task.go | 660 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/monitoring.go | 229 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/register.go (renamed from weed/worker/tasks/erasure_coding/ec_register.go) | 17 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/scheduling.go | 40 |
6 files changed, 1238 insertions, 807 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 450080a12..1122d2721 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -5,7 +5,10 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -69,6 +72,38 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB), ScheduleAt: now, } + + // Plan EC destinations if ActiveTopology is available + if clusterInfo.ActiveTopology != nil { + multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig) + if err != nil { + glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) + continue // Skip this volume if destination planning fails + } + + // Find all volume replicas from topology + replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) + + // Create typed parameters with EC destination information and replicas + result.TypedParams = &worker_pb.TaskParams{ + VolumeId: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + VolumeSize: metric.Size, // Store original volume size for tracking changes + Replicas: replicas, // Include all volume replicas for deletion + TaskParams: &worker_pb.TaskParams_ErasureCodingParams{ + ErasureCodingParams: createECTaskParams(multiPlan), + }, + } + + glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs", + metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs) + } else { + glog.Warningf("No ActiveTopology available for destination planning in EC detection") + continue // Skip this volume if no topology available + } + results = append(results, result) } else { // Count debug reasons @@ -105,36 +140,277 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI return results, nil } -// Scheduling implements the scheduling logic for erasure coding tasks -func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - ecConfig := config.(*Config) +// planECDestinations plans the destinations for erasure coding operation +// This function implements EC destination planning logic directly in the detection phase +func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { + // Get source node information from topology + var sourceRack, sourceDC string - // Check if we have available workers - if len(availableWorkers) == 0 { - return false + // Extract rack and DC from topology info + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dataNodeInfo := range rack.DataNodeInfos { + if dataNodeInfo.Id == metric.Server { + sourceDC = dc.Id + sourceRack = rack.Id + break + } + } + if sourceRack != "" { + break + } + } + if sourceDC != "" { + break + } + } + } + + // Determine minimum shard disk locations based on configuration + minTotalDisks := 4 + + // Get available disks for EC placement (include source node for EC) + availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "") + if len(availableDisks) < minTotalDisks { + return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", minTotalDisks, len(availableDisks)) + } + + // Select best disks for EC placement with rack/DC diversity + selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount) + if len(selectedDisks) < minTotalDisks { + return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks) + } + + var plans []*topology.DestinationPlan + rackCount := make(map[string]int) + dcCount := make(map[string]int) + + for _, disk := range selectedDisks { + plan := &topology.DestinationPlan{ + TargetNode: disk.NodeID, + TargetDisk: disk.DiskID, + TargetRack: disk.Rack, + TargetDC: disk.DataCenter, + ExpectedSize: 0, // EC shards don't have predetermined size + PlacementScore: calculateECScore(disk, sourceRack, sourceDC), + Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), + } + plans = append(plans, plan) + + // Count rack and DC diversity + rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) + rackCount[rackKey]++ + dcCount[disk.DataCenter]++ + } + + return &topology.MultiDestinationPlan{ + Plans: plans, + TotalShards: len(plans), + SuccessfulRack: len(rackCount), + SuccessfulDCs: len(dcCount), + }, nil +} + +// createECTaskParams creates EC task parameters from the multi-destination plan +func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams { + var destinations []*worker_pb.ECDestination + + for _, plan := range multiPlan.Plans { + destination := &worker_pb.ECDestination{ + Node: plan.TargetNode, + DiskId: plan.TargetDisk, + Rack: plan.TargetRack, + DataCenter: plan.TargetDC, + PlacementScore: plan.PlacementScore, + } + destinations = append(destinations, destination) + } + + // Collect placement conflicts from all destinations + var placementConflicts []string + for _, plan := range multiPlan.Plans { + placementConflicts = append(placementConflicts, plan.Conflicts...) + } + + return &worker_pb.ErasureCodingTaskParams{ + Destinations: destinations, + DataShards: erasure_coding.DataShardsCount, // Standard data shards + ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards + PlacementConflicts: placementConflicts, + } +} + +// selectBestECDestinations selects multiple disks for EC shard placement with diversity +func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { + if len(disks) == 0 { + return nil + } + + // Group disks by rack and DC for diversity + rackGroups := make(map[string][]*topology.DiskInfo) + for _, disk := range disks { + rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) + rackGroups[rackKey] = append(rackGroups[rackKey], disk) + } + + var selected []*topology.DiskInfo + usedRacks := make(map[string]bool) + + // First pass: select one disk from each rack for maximum diversity + for rackKey, rackDisks := range rackGroups { + if len(selected) >= shardsNeeded { + break + } + + // Select best disk from this rack + bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC) + if bestDisk != nil { + selected = append(selected, bestDisk) + usedRacks[rackKey] = true + } + } + + // Second pass: if we need more disks, select from racks we've already used + if len(selected) < shardsNeeded { + for _, disk := range disks { + if len(selected) >= shardsNeeded { + break + } + + // Skip if already selected + alreadySelected := false + for _, sel := range selected { + if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID { + alreadySelected = true + break + } + } + + if !alreadySelected && isDiskSuitableForEC(disk) { + selected = append(selected, disk) + } + } + } + + return selected +} + +// selectBestFromRack selects the best disk from a rack for EC placement +func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo { + if len(disks) == 0 { + return nil } - // Count running EC tasks - runningCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { - runningCount++ + var bestDisk *topology.DiskInfo + bestScore := -1.0 + + for _, disk := range disks { + if !isDiskSuitableForEC(disk) { + continue + } + + score := calculateECScore(disk, sourceRack, sourceDC) + if score > bestScore { + bestScore = score + bestDisk = disk } } - // Check concurrency limit - if runningCount >= ecConfig.MaxConcurrent { + return bestDisk +} + +// calculateECScore calculates placement score for EC operations +func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 { + if disk.DiskInfo == nil { + return 0.0 + } + + score := 0.0 + + // Prefer disks with available capacity + if disk.DiskInfo.MaxVolumeCount > 0 { + utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) + score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity + } + + // Prefer different racks for better distribution + if disk.Rack != sourceRack { + score += 30.0 + } + + // Prefer different data centers for better distribution + if disk.DataCenter != sourceDC { + score += 20.0 + } + + // Consider current load + score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load + + return score +} + +// isDiskSuitableForEC checks if a disk is suitable for EC placement +func isDiskSuitableForEC(disk *topology.DiskInfo) bool { + if disk.DiskInfo == nil { + return false + } + + // Check if disk has capacity + if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount { + return false + } + + // Check if disk is not overloaded + if disk.LoadCount > 10 { // Arbitrary threshold return false } - // Check if any worker can handle EC tasks - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { - return true + return true +} + +// checkECPlacementConflicts checks for placement rule conflicts in EC operations +func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string { + var conflicts []string + + // For EC, being on the same rack as source is often acceptable + // but we note it as potential conflict for monitoring + if disk.Rack == sourceRack && disk.DataCenter == sourceDC { + conflicts = append(conflicts, "same_rack_as_source") + } + + return conflicts +} + +// findVolumeReplicas finds all servers that have replicas of the specified volume +func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string { + if activeTopology == nil { + return []string{} + } + + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo == nil { + return []string{} + } + + var replicaServers []string + + // Iterate through all nodes to find volume replicas + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, nodeInfo := range rack.DataNodeInfos { + for _, diskInfo := range nodeInfo.DiskInfos { + for _, volumeInfo := range diskInfo.VolumeInfos { + if volumeInfo.Id == volumeID && volumeInfo.Collection == collection { + replicaServers = append(replicaServers, nodeInfo.Id) + break // Found volume on this node, move to next node + } + } + } } } } - return false + return replicaServers } diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go deleted file mode 100644 index 8dc7a1cd0..000000000 --- a/weed/worker/tasks/erasure_coding/ec.go +++ /dev/null @@ -1,785 +0,0 @@ -package erasure_coding - -import ( - "context" - "fmt" - "io" - "math" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/operation" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" - "github.com/seaweedfs/seaweedfs/weed/worker/types" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// Task implements comprehensive erasure coding with protobuf parameters -type Task struct { - *base.BaseTypedTask - - // Current task state - sourceServer string - volumeID uint32 - collection string - workDir string - masterClient string - grpcDialOpt grpc.DialOption - - // EC parameters from protobuf - destinations []*worker_pb.ECDestination // Disk-aware destinations - existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup - estimatedShardSize uint64 - dataShards int - parityShards int - cleanupSource bool - - // Progress tracking - currentStep string - stepProgress map[string]float64 -} - -// NewTask creates a new erasure coding task -func NewTask() types.TypedTaskInterface { - task := &Task{ - BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding), - masterClient: "localhost:9333", // Default master client - workDir: "/tmp/seaweedfs_ec_work", // Default work directory - grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure - dataShards: erasure_coding.DataShardsCount, // Use package constant - parityShards: erasure_coding.ParityShardsCount, // Use package constant - stepProgress: make(map[string]float64), - } - return task -} - -// ValidateTyped validates the typed parameters for EC task -func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error { - // Basic validation from base class - if err := t.BaseTypedTask.ValidateTyped(params); err != nil { - return err - } - - // Check that we have EC-specific parameters - ecParams := params.GetErasureCodingParams() - if ecParams == nil { - return fmt.Errorf("erasure_coding_params is required for EC task") - } - - // Require destinations - if len(ecParams.Destinations) == 0 { - return fmt.Errorf("destinations must be specified for EC task") - } - - // DataShards and ParityShards are constants from erasure_coding package - expectedDataShards := int32(erasure_coding.DataShardsCount) - expectedParityShards := int32(erasure_coding.ParityShardsCount) - - if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards { - return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards) - } - if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards { - return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards) - } - - // Validate destination count - destinationCount := len(ecParams.Destinations) - totalShards := expectedDataShards + expectedParityShards - if totalShards > int32(destinationCount) { - return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount) - } - - return nil -} - -// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters -func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration { - baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations - - ecParams := params.GetErasureCodingParams() - if ecParams != nil && ecParams.EstimatedShardSize > 0 { - // More accurate estimate based on shard size - // Account for copying, encoding, and distribution - gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024) - estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB - if estimatedTime > baseTime { - return estimatedTime - } - } - - return baseTime -} - -// ExecuteTyped implements the actual erasure coding workflow with typed parameters -func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error { - // Extract basic parameters - t.volumeID = params.VolumeId - t.sourceServer = params.Server - t.collection = params.Collection - - // Extract EC-specific parameters - ecParams := params.GetErasureCodingParams() - if ecParams != nil { - t.destinations = ecParams.Destinations // Store disk-aware destinations - t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup - t.estimatedShardSize = ecParams.EstimatedShardSize - t.cleanupSource = ecParams.CleanupSource - - // DataShards and ParityShards are constants, don't override from parameters - // t.dataShards and t.parityShards are already set to constants in NewTask - - if ecParams.WorkingDir != "" { - t.workDir = ecParams.WorkingDir - } - if ecParams.MasterClient != "" { - t.masterClient = ecParams.MasterClient - } - } - - // Determine available destinations for logging - var availableDestinations []string - for _, dest := range t.destinations { - availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId)) - } - - glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)", - t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards) - - // Create unique working directory for this task - taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix())) - if err := os.MkdirAll(taskWorkDir, 0755); err != nil { - return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err) - } - glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir) - - // Ensure cleanup of working directory - defer func() { - if err := os.RemoveAll(taskWorkDir); err != nil { - glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err) - } else { - glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir) - } - }() - - // Step 1: Collect volume locations from master - glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master") - t.SetProgress(5.0) - volumeId := needle.VolumeId(t.volumeID) - volumeLocations, err := t.collectVolumeLocations(volumeId) - if err != nil { - return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err) - } - glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations) - - // Convert ServerAddress slice to string slice - var locationStrings []string - for _, addr := range volumeLocations { - locationStrings = append(locationStrings, string(addr)) - } - - // Step 2: Check if volume has sufficient size for EC encoding - if !t.shouldPerformECEncoding(locationStrings) { - glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID) - t.SetProgress(100.0) - return nil - } - - // Step 2A: Cleanup existing EC shards if any - glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID) - t.SetProgress(10.0) - err = t.cleanupExistingEcShards() - if err != nil { - glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err) - // Don't fail the task - this is just cleanup - } - glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID) - - // Step 3: Mark volume readonly on all servers - glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID) - t.SetProgress(15.0) - err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings) - if err != nil { - return fmt.Errorf("failed to mark volume readonly: %v", err) - } - glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID) - - // Step 5: Copy volume files (.dat, .idx) to EC worker - glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer) - t.SetProgress(25.0) - localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) - if err != nil { - return fmt.Errorf("failed to copy volume files to EC worker: %v", err) - } - glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles) - - // Step 6: Generate EC shards locally on EC worker - glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker") - t.SetProgress(40.0) - localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir) - if err != nil { - return fmt.Errorf("failed to generate EC shards locally: %v", err) - } - glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles)) - - // Step 7: Distribute shards from EC worker to destination servers - glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers") - t.SetProgress(60.0) - err = t.distributeEcShardsFromWorker(localShardFiles) - if err != nil { - return fmt.Errorf("failed to distribute EC shards from worker: %v", err) - } - glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers") - - // Step 8: Mount EC shards on destination servers - glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers") - t.SetProgress(80.0) - err = t.mountEcShardsOnDestinations() - if err != nil { - return fmt.Errorf("failed to mount EC shards: %v", err) - } - glog.V(1).Infof("WORKFLOW: EC shards mounted successfully") - - // Step 9: Delete original volume from all locations - glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID) - t.SetProgress(90.0) - err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings) - if err != nil { - return fmt.Errorf("failed to delete original volume: %v", err) - } - glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID) - - t.SetProgress(100.0) - glog.Infof("EC task completed successfully for volume %d", t.volumeID) - return nil -} - -// collectVolumeLocations gets volume location from master (placeholder implementation) -func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) { - // For now, return a placeholder implementation - // Full implementation would call master to get volume locations - return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil -} - -// cleanupExistingEcShards deletes existing EC shards using planned locations -func (t *Task) cleanupExistingEcShards() error { - if len(t.existingShardLocations) == 0 { - glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID) - return nil - } - - glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations)) - - // Delete existing shards from each location using planned shard locations - for _, location := range t.existingShardLocations { - if len(location.ShardIds) == 0 { - continue - } - - glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID) - - err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt, - func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - ShardIds: location.ShardIds, - }) - return deleteErr - }) - - if err != nil { - glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err) - // Continue with other servers - don't fail the entire cleanup - } else { - glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID) - } - } - - glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID) - return nil -} - -// shouldPerformECEncoding checks if the volume meets criteria for EC encoding -func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool { - // For now, always proceed with EC encoding if volume exists - // This can be extended with volume size checks, etc. - return len(volumeLocations) > 0 -} - -// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers -func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error { - glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations)) - - // Mark volume readonly on all replica servers - for _, location := range volumeLocations { - glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location) - - err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt, - func(client volume_server_pb.VolumeServerClient) error { - _, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ - VolumeId: uint32(volumeId), - }) - return markErr - }) - - if err != nil { - glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err) - return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err) - } - - glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location) - } - - glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations)) - return nil -} - -// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker -func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { - localFiles := make(map[string]string) - - // Copy .dat file - datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID))) - err := t.copyFileFromSource(".dat", datFile) - if err != nil { - return nil, fmt.Errorf("failed to copy .dat file: %v", err) - } - localFiles["dat"] = datFile - glog.V(1).Infof("Copied .dat file to: %s", datFile) - - // Copy .idx file - idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID))) - err = t.copyFileFromSource(".idx", idxFile) - if err != nil { - return nil, fmt.Errorf("failed to copy .idx file: %v", err) - } - localFiles["idx"] = idxFile - glog.V(1).Infof("Copied .idx file to: %s", idxFile) - - return localFiles, nil -} - -// copyFileFromSource copies a file from source server to local path using gRPC streaming -func (t *Task) copyFileFromSource(ext, localPath string) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt, - func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - Ext: ext, - StopOffset: uint64(math.MaxInt64), - }) - if err != nil { - return fmt.Errorf("failed to initiate file copy: %v", err) - } - - // Create local file - localFile, err := os.Create(localPath) - if err != nil { - return fmt.Errorf("failed to create local file %s: %v", localPath, err) - } - defer localFile.Close() - - // Stream data and write to local file - totalBytes := int64(0) - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("failed to receive file data: %v", err) - } - - if len(resp.FileContent) > 0 { - written, writeErr := localFile.Write(resp.FileContent) - if writeErr != nil { - return fmt.Errorf("failed to write to local file: %v", writeErr) - } - totalBytes += int64(written) - } - } - - glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath) - return nil - }) -} - -// generateEcShardsLocally generates EC shards from local volume files -func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) { - datFile := localFiles["dat"] - idxFile := localFiles["idx"] - - if datFile == "" || idxFile == "" { - return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile) - } - - // Get base name without extension for EC operations - baseName := strings.TrimSuffix(datFile, ".dat") - - shardFiles := make(map[string]string) - - glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile) - - // Generate EC shard files (.ec00 ~ .ec13) - if err := erasure_coding.WriteEcFiles(baseName); err != nil { - return nil, fmt.Errorf("failed to generate EC shard files: %v", err) - } - - // Generate .ecx file from .idx - if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil { - return nil, fmt.Errorf("failed to generate .ecx file: %v", err) - } - - // Collect generated shard file paths - for i := 0; i < erasure_coding.TotalShardsCount; i++ { - shardFile := fmt.Sprintf("%s.ec%02d", baseName, i) - if _, err := os.Stat(shardFile); err == nil { - shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile - } - } - - // Add metadata files - ecxFile := idxFile + ".ecx" - if _, err := os.Stat(ecxFile); err == nil { - shardFiles["ecx"] = ecxFile - } - - // Generate .vif file (volume info) - vifFile := baseName + ".vif" - // Create basic volume info - in a real implementation, this would come from the original volume - volumeInfo := &volume_server_pb.VolumeInfo{ - Version: uint32(needle.GetCurrentVersion()), - } - if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil { - glog.Warningf("Failed to create .vif file: %v", err) - } else { - shardFiles["vif"] = vifFile - } - - glog.V(1).Infof("Generated %d EC files locally", len(shardFiles)) - return shardFiles, nil -} - -func (t *Task) copyEcShardsToDestinations() error { - if len(t.destinations) == 0 { - return fmt.Errorf("no destinations specified for EC shard distribution") - } - - destinations := t.destinations - - glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations)) - - // Prepare shard IDs (0-13 for EC shards) - var shardIds []uint32 - for i := 0; i < erasure_coding.TotalShardsCount; i++ { - shardIds = append(shardIds, uint32(i)) - } - - // Distribute shards across destinations - var wg sync.WaitGroup - errorChan := make(chan error, len(destinations)) - - // Track which disks have already received metadata files (server+disk) - metadataFilesCopied := make(map[string]bool) - var metadataMutex sync.Mutex - - // For each destination, copy a subset of shards - shardsPerDest := len(shardIds) / len(destinations) - remainder := len(shardIds) % len(destinations) - - shardOffset := 0 - for i, dest := range destinations { - wg.Add(1) - - shardsForThisDest := shardsPerDest - if i < remainder { - shardsForThisDest++ // Distribute remainder shards - } - - destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest] - shardOffset += shardsForThisDest - - go func(destination *worker_pb.ECDestination, targetShardIds []uint32) { - defer wg.Done() - - if t.IsCancelled() { - errorChan <- fmt.Errorf("task cancelled during shard copy") - return - } - - // Create disk-specific metadata key (server+disk) - diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId) - - glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)", - targetShardIds, t.sourceServer, destination.Node, destination.DiskId) - - // Check if this disk needs metadata files (only once per disk) - metadataMutex.Lock() - needsMetadataFiles := !metadataFilesCopied[diskKey] - if needsMetadataFiles { - metadataFilesCopied[diskKey] = true - } - metadataMutex.Unlock() - - err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt, - func(client volume_server_pb.VolumeServerClient) error { - _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: uint32(t.volumeID), - Collection: t.collection, - ShardIds: targetShardIds, - CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk - CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk - CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk - SourceDataNode: t.sourceServer, - DiskId: destination.DiskId, // Pass target disk ID - }) - return copyErr - }) - - if err != nil { - errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err) - return - } - - if needsMetadataFiles { - glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d", - targetShardIds, destination.Node, destination.DiskId) - } else { - glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)", - targetShardIds, destination.Node, destination.DiskId) - } - }(dest, destShardIds) - } - - wg.Wait() - close(errorChan) - - // Check for any copy errors - if err := <-errorChan; err != nil { - return err - } - - glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID) - return nil -} - -// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers -func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error { - if len(t.destinations) == 0 { - return fmt.Errorf("no destinations specified for EC shard distribution") - } - - destinations := t.destinations - - glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations)) - - // Prepare shard IDs (0-13 for EC shards) - var shardIds []uint32 - for i := 0; i < erasure_coding.TotalShardsCount; i++ { - shardIds = append(shardIds, uint32(i)) - } - - // Distribute shards across destinations - var wg sync.WaitGroup - errorChan := make(chan error, len(destinations)) - - // Track which disks have already received metadata files (server+disk) - metadataFilesCopied := make(map[string]bool) - var metadataMutex sync.Mutex - - // For each destination, send a subset of shards - shardsPerDest := len(shardIds) / len(destinations) - remainder := len(shardIds) % len(destinations) - - shardOffset := 0 - for i, dest := range destinations { - wg.Add(1) - - shardsForThisDest := shardsPerDest - if i < remainder { - shardsForThisDest++ // Distribute remainder shards - } - - destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest] - shardOffset += shardsForThisDest - - go func(destination *worker_pb.ECDestination, targetShardIds []uint32) { - defer wg.Done() - - if t.IsCancelled() { - errorChan <- fmt.Errorf("task cancelled during shard distribution") - return - } - - // Create disk-specific metadata key (server+disk) - diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId) - - glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)", - targetShardIds, destination.Node, destination.DiskId) - - // Check if this disk needs metadata files (only once per disk) - metadataMutex.Lock() - needsMetadataFiles := !metadataFilesCopied[diskKey] - if needsMetadataFiles { - metadataFilesCopied[diskKey] = true - } - metadataMutex.Unlock() - - // Send shard files to destination using HTTP upload (simplified for now) - err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles) - if err != nil { - errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err) - return - } - - if needsMetadataFiles { - glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d", - targetShardIds, destination.Node, destination.DiskId) - } else { - glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)", - targetShardIds, destination.Node, destination.DiskId) - } - }(dest, destShardIds) - } - - wg.Wait() - close(errorChan) - - // Check for any distribution errors - if err := <-errorChan; err != nil { - return err - } - - glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID) - return nil -} - -// sendShardsToDestination sends specific shard files from worker to a destination server (simplified) -func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error { - // For now, use a simplified approach - just upload the files - // In a full implementation, this would use proper file upload mechanisms - glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId) - - // TODO: Implement actual file upload to volume server - // This is a placeholder - actual implementation would: - // 1. Open each shard file locally - // 2. Upload via HTTP POST or gRPC stream to destination volume server - // 3. Volume server would save to the specified disk_id - - return nil -} - -// mountEcShardsOnDestinations mounts EC shards on all destination servers -func (t *Task) mountEcShardsOnDestinations() error { - if len(t.destinations) == 0 { - return fmt.Errorf("no destinations specified for mounting EC shards") - } - - destinations := t.destinations - - glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations)) - - // Prepare all shard IDs (0-13) - var allShardIds []uint32 - for i := 0; i < erasure_coding.TotalShardsCount; i++ { - allShardIds = append(allShardIds, uint32(i)) - } - - var wg sync.WaitGroup - errorChan := make(chan error, len(destinations)) - - // Mount shards on each destination server - for _, dest := range destinations { - wg.Add(1) - - go func(destination *worker_pb.ECDestination) { - defer wg.Done() - - if t.IsCancelled() { - errorChan <- fmt.Errorf("task cancelled during shard mounting") - return - } - - glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId) - - err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt, - func(client volume_server_pb.VolumeServerClient) error { - _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: uint32(t.volumeID), - Collection: t.collection, - ShardIds: allShardIds, // Mount all available shards on each server - }) - return mountErr - }) - - if err != nil { - // It's normal for some servers to not have all shards, so log as warning rather than error - glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err) - } else { - glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId) - } - }(dest) - } - - wg.Wait() - close(errorChan) - - // Check for any critical mounting errors - select { - case err := <-errorChan: - if err != nil { - glog.Warningf("Some shard mounting issues occurred: %v", err) - } - default: - // No errors - } - - glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID) - return nil -} - -// deleteVolumeFromAllLocations deletes the original volume from all replica servers -func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error { - glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations)) - - for _, location := range volumeLocations { - glog.V(1).Infof("Deleting volume %d from %s", volumeId, location) - - err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt, - func(client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ - VolumeId: uint32(volumeId), - OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards - }) - return deleteErr - }) - - if err != nil { - glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err) - return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err) - } - - glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location) - } - - glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations)) - return nil -} - -// Register the task in the global registry -func init() { - types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask) - glog.V(1).Infof("Registered EC task") -} diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go new file mode 100644 index 000000000..a6a3f749f --- /dev/null +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -0,0 +1,660 @@ +package erasure_coding + +import ( + "context" + "fmt" + "io" + "math" + "os" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" + "github.com/seaweedfs/seaweedfs/weed/worker/types" + "github.com/seaweedfs/seaweedfs/weed/worker/types/base" + "google.golang.org/grpc" +) + +// ErasureCodingTask implements the Task interface +type ErasureCodingTask struct { + *base.BaseTask + server string + volumeID uint32 + collection string + workDir string + progress float64 + + // EC parameters + dataShards int32 + parityShards int32 + destinations []*worker_pb.ECDestination + shardAssignment map[string][]string // destination -> assigned shard types + replicas []string // volume replica servers for deletion +} + +// NewErasureCodingTask creates a new unified EC task instance +func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask { + return &ErasureCodingTask{ + BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding), + server: server, + volumeID: volumeID, + collection: collection, + dataShards: erasure_coding.DataShardsCount, // Default values + parityShards: erasure_coding.ParityShardsCount, // Default values + } +} + +// Execute implements the UnifiedTask interface +func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { + if params == nil { + return fmt.Errorf("task parameters are required") + } + + ecParams := params.GetErasureCodingParams() + if ecParams == nil { + return fmt.Errorf("erasure coding parameters are required") + } + + t.dataShards = ecParams.DataShards + t.parityShards = ecParams.ParityShards + t.workDir = ecParams.WorkingDir + t.destinations = ecParams.Destinations + t.replicas = params.Replicas // Get replicas from task parameters + + t.GetLogger().WithFields(map[string]interface{}{ + "volume_id": t.volumeID, + "server": t.server, + "collection": t.collection, + "data_shards": t.dataShards, + "parity_shards": t.parityShards, + "destinations": len(t.destinations), + }).Info("Starting erasure coding task") + + // Use the working directory from task parameters, or fall back to a default + baseWorkDir := t.workDir + if baseWorkDir == "" { + baseWorkDir = "/tmp/seaweedfs_ec_work" + } + + // Create unique working directory for this task + taskWorkDir := filepath.Join(baseWorkDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix())) + if err := os.MkdirAll(taskWorkDir, 0755); err != nil { + return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err) + } + glog.V(1).Infof("Created working directory: %s", taskWorkDir) + + // Update the task's working directory to the specific instance directory + t.workDir = taskWorkDir + glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir) + + // Ensure cleanup of working directory (but preserve logs) + defer func() { + // Clean up volume files and EC shards, but preserve the directory structure and any logs + patterns := []string{"*.dat", "*.idx", "*.ec*", "*.vif"} + for _, pattern := range patterns { + matches, err := filepath.Glob(filepath.Join(taskWorkDir, pattern)) + if err != nil { + continue + } + for _, match := range matches { + if err := os.Remove(match); err != nil { + glog.V(2).Infof("Could not remove %s: %v", match, err) + } + } + } + glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir) + }() + + // Step 1: Mark volume readonly + t.ReportProgress(10.0) + t.GetLogger().Info("Marking volume readonly") + if err := t.markVolumeReadonly(); err != nil { + return fmt.Errorf("failed to mark volume readonly: %v", err) + } + + // Step 2: Copy volume files to worker + t.ReportProgress(25.0) + t.GetLogger().Info("Copying volume files to worker") + localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) + if err != nil { + return fmt.Errorf("failed to copy volume files: %v", err) + } + + // Step 3: Generate EC shards locally + t.ReportProgress(40.0) + t.GetLogger().Info("Generating EC shards locally") + shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir) + if err != nil { + return fmt.Errorf("failed to generate EC shards: %v", err) + } + + // Step 4: Distribute shards to destinations + t.ReportProgress(60.0) + t.GetLogger().Info("Distributing EC shards to destinations") + if err := t.distributeEcShards(shardFiles); err != nil { + return fmt.Errorf("failed to distribute EC shards: %v", err) + } + + // Step 5: Mount EC shards + t.ReportProgress(80.0) + t.GetLogger().Info("Mounting EC shards") + if err := t.mountEcShards(); err != nil { + return fmt.Errorf("failed to mount EC shards: %v", err) + } + + // Step 6: Delete original volume + t.ReportProgress(90.0) + t.GetLogger().Info("Deleting original volume") + if err := t.deleteOriginalVolume(); err != nil { + return fmt.Errorf("failed to delete original volume: %v", err) + } + + t.ReportProgress(100.0) + glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed", + t.volumeID, t.server, len(shardFiles)) + + return nil +} + +// Validate implements the UnifiedTask interface +func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error { + if params == nil { + return fmt.Errorf("task parameters are required") + } + + ecParams := params.GetErasureCodingParams() + if ecParams == nil { + return fmt.Errorf("erasure coding parameters are required") + } + + if params.VolumeId != t.volumeID { + return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId) + } + + if params.Server != t.server { + return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server) + } + + if ecParams.DataShards < 1 { + return fmt.Errorf("invalid data shards: %d (must be >= 1)", ecParams.DataShards) + } + + if ecParams.ParityShards < 1 { + return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards) + } + + if len(ecParams.Destinations) < int(ecParams.DataShards+ecParams.ParityShards) { + return fmt.Errorf("insufficient destinations: got %d, need %d", len(ecParams.Destinations), ecParams.DataShards+ecParams.ParityShards) + } + + return nil +} + +// EstimateTime implements the UnifiedTask interface +func (t *ErasureCodingTask) EstimateTime(params *worker_pb.TaskParams) time.Duration { + // Basic estimate based on simulated steps + return 20 * time.Second // Sum of all step durations +} + +// GetProgress returns current progress +func (t *ErasureCodingTask) GetProgress() float64 { + return t.progress +} + +// Helper methods for actual EC operations + +// markVolumeReadonly marks the volume as readonly on the source server +func (t *ErasureCodingTask) markVolumeReadonly() error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), + func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: t.volumeID, + }) + return err + }) +} + +// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker +func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { + localFiles := make(map[string]string) + + // Copy .dat file + datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID)) + if err := t.copyFileFromSource(".dat", datFile); err != nil { + return nil, fmt.Errorf("failed to copy .dat file: %v", err) + } + localFiles["dat"] = datFile + + // Copy .idx file + idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID)) + if err := t.copyFileFromSource(".idx", idxFile); err != nil { + return nil, fmt.Errorf("failed to copy .idx file: %v", err) + } + localFiles["idx"] = idxFile + + return localFiles, nil +} + +// copyFileFromSource copies a file from source server to local path using gRPC streaming +func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(), + func(client volume_server_pb.VolumeServerClient) error { + stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + Ext: ext, + StopOffset: uint64(math.MaxInt64), + }) + if err != nil { + return fmt.Errorf("failed to initiate file copy: %v", err) + } + + // Create local file + localFile, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create local file %s: %v", localPath, err) + } + defer localFile.Close() + + // Stream data and write to local file + totalBytes := int64(0) + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to receive file data: %v", err) + } + + if len(resp.FileContent) > 0 { + written, writeErr := localFile.Write(resp.FileContent) + if writeErr != nil { + return fmt.Errorf("failed to write to local file: %v", writeErr) + } + totalBytes += int64(written) + } + } + + glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath) + return nil + }) +} + +// generateEcShardsLocally generates EC shards from local volume files +func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) { + datFile := localFiles["dat"] + idxFile := localFiles["idx"] + + if datFile == "" || idxFile == "" { + return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile) + } + + // Get base name without extension for EC operations + baseName := strings.TrimSuffix(datFile, ".dat") + shardFiles := make(map[string]string) + + glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile) + + // Generate EC shard files (.ec00 ~ .ec13) + if err := erasure_coding.WriteEcFiles(baseName); err != nil { + return nil, fmt.Errorf("failed to generate EC shard files: %v", err) + } + + // Generate .ecx file from .idx (use baseName, not full idx path) + if err := erasure_coding.WriteSortedFileFromIdx(baseName, ".ecx"); err != nil { + return nil, fmt.Errorf("failed to generate .ecx file: %v", err) + } + + // Collect generated shard file paths + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile := fmt.Sprintf("%s.ec%02d", baseName, i) + if _, err := os.Stat(shardFile); err == nil { + shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile + } + } + + // Add metadata files + ecxFile := baseName + ".ecx" + if _, err := os.Stat(ecxFile); err == nil { + shardFiles["ecx"] = ecxFile + } + + // Generate .vif file (volume info) + vifFile := baseName + ".vif" + volumeInfo := &volume_server_pb.VolumeInfo{ + Version: uint32(needle.GetCurrentVersion()), + } + if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil { + glog.Warningf("Failed to create .vif file: %v", err) + } else { + shardFiles["vif"] = vifFile + } + + glog.V(1).Infof("Generated %d EC files locally", len(shardFiles)) + return shardFiles, nil +} + +// distributeEcShards distributes locally generated EC shards to destination servers +func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error { + if len(t.destinations) == 0 { + return fmt.Errorf("no destinations specified for EC shard distribution") + } + + if len(shardFiles) == 0 { + return fmt.Errorf("no shard files available for distribution") + } + + // Create shard assignment: assign specific shards to specific destinations + shardAssignment := t.createShardAssignment(shardFiles) + if len(shardAssignment) == 0 { + return fmt.Errorf("failed to create shard assignment") + } + + // Store assignment for use during mounting + t.shardAssignment = shardAssignment + + // Send assigned shards to each destination + for destNode, assignedShards := range shardAssignment { + t.GetLogger().WithFields(map[string]interface{}{ + "destination": destNode, + "assigned_shards": len(assignedShards), + "shard_ids": assignedShards, + }).Info("Distributing assigned EC shards to destination") + + // Send only the assigned shards to this destination + for _, shardType := range assignedShards { + filePath, exists := shardFiles[shardType] + if !exists { + return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode) + } + + if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil { + return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err) + } + } + } + + glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment)) + return nil +} + +// createShardAssignment assigns specific EC shards to specific destination servers +// Each destination gets a subset of shards based on availability and placement rules +func (t *ErasureCodingTask) createShardAssignment(shardFiles map[string]string) map[string][]string { + assignment := make(map[string][]string) + + // Collect all available EC shards (ec00-ec13) + var availableShards []string + for shardType := range shardFiles { + if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { + availableShards = append(availableShards, shardType) + } + } + + // Sort shards for consistent assignment + sort.Strings(availableShards) + + if len(availableShards) == 0 { + glog.Warningf("No EC shards found for assignment") + return assignment + } + + // Calculate shards per destination + numDestinations := len(t.destinations) + if numDestinations == 0 { + return assignment + } + + // Strategy: Distribute shards as evenly as possible across destinations + // With 14 shards and N destinations, some destinations get ā14/Nā shards, others get ā14/Nā + shardsPerDest := len(availableShards) / numDestinations + extraShards := len(availableShards) % numDestinations + + shardIndex := 0 + for i, dest := range t.destinations { + var destShards []string + + // Assign base number of shards + shardsToAssign := shardsPerDest + + // Assign one extra shard to first 'extraShards' destinations + if i < extraShards { + shardsToAssign++ + } + + // Assign the shards + for j := 0; j < shardsToAssign && shardIndex < len(availableShards); j++ { + destShards = append(destShards, availableShards[shardIndex]) + shardIndex++ + } + + assignment[dest.Node] = destShards + + glog.V(2).Infof("Assigned shards %v to destination %s", destShards, dest.Node) + } + + // Assign metadata files (.ecx, .vif) to each destination that has shards + // Note: .ecj files are created during mount, not during initial generation + for destNode, destShards := range assignment { + if len(destShards) > 0 { + // Add .ecx file if available + if _, hasEcx := shardFiles["ecx"]; hasEcx { + assignment[destNode] = append(assignment[destNode], "ecx") + } + + // Add .vif file if available + if _, hasVif := shardFiles["vif"]; hasVif { + assignment[destNode] = append(assignment[destNode], "vif") + } + + glog.V(2).Infof("Assigned metadata files (.ecx, .vif) to destination %s", destNode) + } + } + + return assignment +} + +// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API +func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), grpc.WithInsecure(), + func(client volume_server_pb.VolumeServerClient) error { + // Open the local shard file + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open shard file %s: %v", filePath, err) + } + defer file.Close() + + // Get file size + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to get file info for %s: %v", filePath, err) + } + + // Determine file extension and shard ID + var ext string + var shardId uint32 + if shardType == "ecx" { + ext = ".ecx" + shardId = 0 // ecx file doesn't have a specific shard ID + } else if shardType == "vif" { + ext = ".vif" + shardId = 0 // vif file doesn't have a specific shard ID + } else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { + // EC shard file like "ec00", "ec01", etc. + ext = "." + shardType + fmt.Sscanf(shardType[2:], "%d", &shardId) + } else { + return fmt.Errorf("unknown shard type: %s", shardType) + } + + // Create streaming client + stream, err := client.ReceiveFile(context.Background()) + if err != nil { + return fmt.Errorf("failed to create receive stream: %v", err) + } + + // Send file info first + err = stream.Send(&volume_server_pb.ReceiveFileRequest{ + Data: &volume_server_pb.ReceiveFileRequest_Info{ + Info: &volume_server_pb.ReceiveFileInfo{ + VolumeId: t.volumeID, + Ext: ext, + Collection: t.collection, + IsEcVolume: true, + ShardId: shardId, + FileSize: uint64(fileInfo.Size()), + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to send file info: %v", err) + } + + // Send file content in chunks + buffer := make([]byte, 64*1024) // 64KB chunks + for { + n, readErr := file.Read(buffer) + if n > 0 { + err = stream.Send(&volume_server_pb.ReceiveFileRequest{ + Data: &volume_server_pb.ReceiveFileRequest_FileContent{ + FileContent: buffer[:n], + }, + }) + if err != nil { + return fmt.Errorf("failed to send file content: %v", err) + } + } + if readErr == io.EOF { + break + } + if readErr != nil { + return fmt.Errorf("failed to read file: %v", readErr) + } + } + + // Close stream and get response + resp, err := stream.CloseAndRecv() + if err != nil { + return fmt.Errorf("failed to close stream: %v", err) + } + + if resp.Error != "" { + return fmt.Errorf("server error: %s", resp.Error) + } + + glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer) + return nil + }) +} + +// mountEcShards mounts EC shards on destination servers +func (t *ErasureCodingTask) mountEcShards() error { + if t.shardAssignment == nil { + return fmt.Errorf("shard assignment not available for mounting") + } + + // Mount only assigned shards on each destination + for destNode, assignedShards := range t.shardAssignment { + // Convert shard names to shard IDs for mounting + var shardIds []uint32 + for _, shardType := range assignedShards { + // Skip metadata files (.ecx, .vif) - only mount EC shards + if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 { + // Parse shard ID from "ec00", "ec01", etc. + var shardId uint32 + if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil { + shardIds = append(shardIds, shardId) + } + } + } + + if len(shardIds) == 0 { + glog.V(1).Infof("No EC shards to mount on %s (only metadata files)", destNode) + continue + } + + glog.V(1).Infof("Mounting shards %v on %s", shardIds, destNode) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(), + func(client volume_server_pb.VolumeServerClient) error { + _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: shardIds, + }) + return mountErr + }) + + if err != nil { + glog.Warningf("Failed to mount shards %v on %s: %v", shardIds, destNode, err) + } else { + glog.V(1).Infof("Successfully mounted EC shards %v on %s", shardIds, destNode) + } + } + + return nil +} + +// deleteOriginalVolume deletes the original volume and all its replicas from all servers +func (t *ErasureCodingTask) deleteOriginalVolume() error { + // Get replicas from task parameters (set during detection) + replicas := t.getReplicas() + + if len(replicas) == 0 { + glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID) + replicas = []string{t.server} + } + + glog.V(1).Infof("Deleting volume %d from %d replica servers: %v", t.volumeID, len(replicas), replicas) + + // Delete volume from all replica locations + var deleteErrors []string + successCount := 0 + + for _, replicaServer := range replicas { + err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(), + func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + VolumeId: t.volumeID, + OnlyEmpty: false, // Force delete since we've created EC shards + }) + return err + }) + + if err != nil { + deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err)) + glog.Warningf("Failed to delete volume %d from replica server %s: %v", t.volumeID, replicaServer, err) + } else { + successCount++ + glog.V(1).Infof("Successfully deleted volume %d from replica server %s", t.volumeID, replicaServer) + } + } + + // Report results + if len(deleteErrors) > 0 { + glog.Warningf("Some volume deletions failed (%d/%d successful): %v", successCount, len(replicas), deleteErrors) + // Don't return error - EC task should still be considered successful if shards are mounted + } else { + glog.V(1).Infof("Successfully deleted volume %d from all %d replica servers", t.volumeID, len(replicas)) + } + + return nil +} + +// getReplicas extracts replica servers from task parameters +func (t *ErasureCodingTask) getReplicas() []string { + // Access replicas from the parameters passed during Execute + // We'll need to store these during Execute - let me add a field to the task + return t.replicas +} diff --git a/weed/worker/tasks/erasure_coding/monitoring.go b/weed/worker/tasks/erasure_coding/monitoring.go new file mode 100644 index 000000000..799eb62c8 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/monitoring.go @@ -0,0 +1,229 @@ +package erasure_coding + +import ( + "sync" + "time" +) + +// ErasureCodingMetrics contains erasure coding-specific monitoring data +type ErasureCodingMetrics struct { + // Execution metrics + VolumesEncoded int64 `json:"volumes_encoded"` + TotalShardsCreated int64 `json:"total_shards_created"` + TotalDataProcessed int64 `json:"total_data_processed"` + TotalSourcesRemoved int64 `json:"total_sources_removed"` + LastEncodingTime time.Time `json:"last_encoding_time"` + + // Performance metrics + AverageEncodingTime int64 `json:"average_encoding_time_seconds"` + AverageShardSize int64 `json:"average_shard_size"` + AverageDataShards int `json:"average_data_shards"` + AverageParityShards int `json:"average_parity_shards"` + SuccessfulOperations int64 `json:"successful_operations"` + FailedOperations int64 `json:"failed_operations"` + + // Distribution metrics + ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"` + ShardsPerRack map[string]int64 `json:"shards_per_rack"` + PlacementSuccessRate float64 `json:"placement_success_rate"` + + // Current task metrics + CurrentVolumeSize int64 `json:"current_volume_size"` + CurrentShardCount int `json:"current_shard_count"` + VolumesPendingEncoding int `json:"volumes_pending_encoding"` + + mutex sync.RWMutex +} + +// NewErasureCodingMetrics creates a new erasure coding metrics instance +func NewErasureCodingMetrics() *ErasureCodingMetrics { + return &ErasureCodingMetrics{ + LastEncodingTime: time.Now(), + ShardsPerDataCenter: make(map[string]int64), + ShardsPerRack: make(map[string]int64), + } +} + +// RecordVolumeEncoded records a successful volume encoding operation +func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.VolumesEncoded++ + m.TotalShardsCreated += int64(shardsCreated) + m.TotalDataProcessed += volumeSize + m.SuccessfulOperations++ + m.LastEncodingTime = time.Now() + + if sourceRemoved { + m.TotalSourcesRemoved++ + } + + // Update average encoding time + if m.AverageEncodingTime == 0 { + m.AverageEncodingTime = int64(encodingTime.Seconds()) + } else { + // Exponential moving average + newTime := int64(encodingTime.Seconds()) + m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5 + } + + // Update average shard size + if shardsCreated > 0 { + avgShardSize := volumeSize / int64(shardsCreated) + if m.AverageShardSize == 0 { + m.AverageShardSize = avgShardSize + } else { + m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5 + } + } + + // Update average data/parity shards + if m.AverageDataShards == 0 { + m.AverageDataShards = dataShards + m.AverageParityShards = parityShards + } else { + m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5 + m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5 + } +} + +// RecordFailure records a failed erasure coding operation +func (m *ErasureCodingMetrics) RecordFailure() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.FailedOperations++ +} + +// RecordShardPlacement records shard placement for distribution tracking +func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.ShardsPerDataCenter[dataCenter]++ + rackKey := dataCenter + ":" + rack + m.ShardsPerRack[rackKey]++ +} + +// UpdateCurrentVolumeInfo updates current volume processing information +func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.CurrentVolumeSize = volumeSize + m.CurrentShardCount = shardCount +} + +// SetVolumesPendingEncoding sets the number of volumes pending encoding +func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.VolumesPendingEncoding = count +} + +// UpdatePlacementSuccessRate updates the placement success rate +func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.PlacementSuccessRate == 0 { + m.PlacementSuccessRate = rate + } else { + // Exponential moving average + m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate + } +} + +// GetMetrics returns a copy of the current metrics (without the mutex) +func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics { + m.mutex.RLock() + defer m.mutex.RUnlock() + + // Create deep copy of maps + shardsPerDC := make(map[string]int64) + for k, v := range m.ShardsPerDataCenter { + shardsPerDC[k] = v + } + + shardsPerRack := make(map[string]int64) + for k, v := range m.ShardsPerRack { + shardsPerRack[k] = v + } + + // Create a copy without the mutex to avoid copying lock value + return ErasureCodingMetrics{ + VolumesEncoded: m.VolumesEncoded, + TotalShardsCreated: m.TotalShardsCreated, + TotalDataProcessed: m.TotalDataProcessed, + TotalSourcesRemoved: m.TotalSourcesRemoved, + LastEncodingTime: m.LastEncodingTime, + AverageEncodingTime: m.AverageEncodingTime, + AverageShardSize: m.AverageShardSize, + AverageDataShards: m.AverageDataShards, + AverageParityShards: m.AverageParityShards, + SuccessfulOperations: m.SuccessfulOperations, + FailedOperations: m.FailedOperations, + ShardsPerDataCenter: shardsPerDC, + ShardsPerRack: shardsPerRack, + PlacementSuccessRate: m.PlacementSuccessRate, + CurrentVolumeSize: m.CurrentVolumeSize, + CurrentShardCount: m.CurrentShardCount, + VolumesPendingEncoding: m.VolumesPendingEncoding, + } +} + +// GetSuccessRate returns the success rate as a percentage +func (m *ErasureCodingMetrics) GetSuccessRate() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + total := m.SuccessfulOperations + m.FailedOperations + if total == 0 { + return 100.0 + } + return float64(m.SuccessfulOperations) / float64(total) * 100.0 +} + +// GetAverageDataProcessed returns the average data processed per volume +func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + if m.VolumesEncoded == 0 { + return 0 + } + return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded) +} + +// GetSourceRemovalRate returns the percentage of sources removed after encoding +func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + if m.VolumesEncoded == 0 { + return 0 + } + return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0 +} + +// Reset resets all metrics to zero +func (m *ErasureCodingMetrics) Reset() { + m.mutex.Lock() + defer m.mutex.Unlock() + + *m = ErasureCodingMetrics{ + LastEncodingTime: time.Now(), + ShardsPerDataCenter: make(map[string]int64), + ShardsPerRack: make(map[string]int64), + } +} + +// Global metrics instance for erasure coding tasks +var globalErasureCodingMetrics = NewErasureCodingMetrics() + +// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance +func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics { + return globalErasureCodingMetrics +} diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/register.go index 62cfe6b56..883aaf965 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/register.go @@ -5,6 +5,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -35,9 +36,19 @@ func RegisterErasureCodingTask() { Icon: "fas fa-shield-alt text-success", Capabilities: []string{"erasure_coding", "data_protection"}, - Config: config, - ConfigSpec: GetConfigSpec(), - CreateTask: nil, // Uses typed task system - see init() in ec.go + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) { + if params == nil { + return nil, fmt.Errorf("task parameters are required") + } + return NewErasureCodingTask( + fmt.Sprintf("erasure_coding-%d", params.VolumeId), + params.Server, + params.VolumeId, + params.Collection, + ), nil + }, DetectionFunc: Detection, ScanInterval: 1 * time.Hour, SchedulingFunc: Scheduling, diff --git a/weed/worker/tasks/erasure_coding/scheduling.go b/weed/worker/tasks/erasure_coding/scheduling.go new file mode 100644 index 000000000..d9d891e04 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/scheduling.go @@ -0,0 +1,40 @@ +package erasure_coding + +import ( + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Scheduling implements the scheduling logic for erasure coding tasks +func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool { + ecConfig := config.(*Config) + + // Check if we have available workers + if len(availableWorkers) == 0 { + return false + } + + // Count running EC tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeErasureCoding { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecConfig.MaxConcurrent { + return false + } + + // Check if any worker can handle EC tasks + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeErasureCoding { + return true + } + } + } + + return false +} |
