aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/erasure_coding')
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go314
-rw-r--r--weed/worker/tasks/erasure_coding/ec.go785
-rw-r--r--weed/worker/tasks/erasure_coding/ec_task.go660
-rw-r--r--weed/worker/tasks/erasure_coding/monitoring.go229
-rw-r--r--weed/worker/tasks/erasure_coding/register.go (renamed from weed/worker/tasks/erasure_coding/ec_register.go)17
-rw-r--r--weed/worker/tasks/erasure_coding/scheduling.go40
6 files changed, 1238 insertions, 807 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index 450080a12..1122d2721 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -5,7 +5,10 @@ import (
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -69,6 +72,38 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB),
ScheduleAt: now,
}
+
+ // Plan EC destinations if ActiveTopology is available
+ if clusterInfo.ActiveTopology != nil {
+ multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
+ if err != nil {
+ glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
+ continue // Skip this volume if destination planning fails
+ }
+
+ // Find all volume replicas from topology
+ replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
+ glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
+
+ // Create typed parameters with EC destination information and replicas
+ result.TypedParams = &worker_pb.TaskParams{
+ VolumeId: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ VolumeSize: metric.Size, // Store original volume size for tracking changes
+ Replicas: replicas, // Include all volume replicas for deletion
+ TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
+ ErasureCodingParams: createECTaskParams(multiPlan),
+ },
+ }
+
+ glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs",
+ metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
+ } else {
+ glog.Warningf("No ActiveTopology available for destination planning in EC detection")
+ continue // Skip this volume if no topology available
+ }
+
results = append(results, result)
} else {
// Count debug reasons
@@ -105,36 +140,277 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
return results, nil
}
-// Scheduling implements the scheduling logic for erasure coding tasks
-func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
- ecConfig := config.(*Config)
+// planECDestinations plans the destinations for erasure coding operation
+// This function implements EC destination planning logic directly in the detection phase
+func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
+ // Get source node information from topology
+ var sourceRack, sourceDC string
- // Check if we have available workers
- if len(availableWorkers) == 0 {
- return false
+ // Extract rack and DC from topology info
+ topologyInfo := activeTopology.GetTopologyInfo()
+ if topologyInfo != nil {
+ for _, dc := range topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, dataNodeInfo := range rack.DataNodeInfos {
+ if dataNodeInfo.Id == metric.Server {
+ sourceDC = dc.Id
+ sourceRack = rack.Id
+ break
+ }
+ }
+ if sourceRack != "" {
+ break
+ }
+ }
+ if sourceDC != "" {
+ break
+ }
+ }
+ }
+
+ // Determine minimum shard disk locations based on configuration
+ minTotalDisks := 4
+
+ // Get available disks for EC placement (include source node for EC)
+ availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "")
+ if len(availableDisks) < minTotalDisks {
+ return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", minTotalDisks, len(availableDisks))
+ }
+
+ // Select best disks for EC placement with rack/DC diversity
+ selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount)
+ if len(selectedDisks) < minTotalDisks {
+ return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks)
+ }
+
+ var plans []*topology.DestinationPlan
+ rackCount := make(map[string]int)
+ dcCount := make(map[string]int)
+
+ for _, disk := range selectedDisks {
+ plan := &topology.DestinationPlan{
+ TargetNode: disk.NodeID,
+ TargetDisk: disk.DiskID,
+ TargetRack: disk.Rack,
+ TargetDC: disk.DataCenter,
+ ExpectedSize: 0, // EC shards don't have predetermined size
+ PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
+ Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
+ }
+ plans = append(plans, plan)
+
+ // Count rack and DC diversity
+ rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
+ rackCount[rackKey]++
+ dcCount[disk.DataCenter]++
+ }
+
+ return &topology.MultiDestinationPlan{
+ Plans: plans,
+ TotalShards: len(plans),
+ SuccessfulRack: len(rackCount),
+ SuccessfulDCs: len(dcCount),
+ }, nil
+}
+
+// createECTaskParams creates EC task parameters from the multi-destination plan
+func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
+ var destinations []*worker_pb.ECDestination
+
+ for _, plan := range multiPlan.Plans {
+ destination := &worker_pb.ECDestination{
+ Node: plan.TargetNode,
+ DiskId: plan.TargetDisk,
+ Rack: plan.TargetRack,
+ DataCenter: plan.TargetDC,
+ PlacementScore: plan.PlacementScore,
+ }
+ destinations = append(destinations, destination)
+ }
+
+ // Collect placement conflicts from all destinations
+ var placementConflicts []string
+ for _, plan := range multiPlan.Plans {
+ placementConflicts = append(placementConflicts, plan.Conflicts...)
+ }
+
+ return &worker_pb.ErasureCodingTaskParams{
+ Destinations: destinations,
+ DataShards: erasure_coding.DataShardsCount, // Standard data shards
+ ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
+ PlacementConflicts: placementConflicts,
+ }
+}
+
+// selectBestECDestinations selects multiple disks for EC shard placement with diversity
+func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
+ if len(disks) == 0 {
+ return nil
+ }
+
+ // Group disks by rack and DC for diversity
+ rackGroups := make(map[string][]*topology.DiskInfo)
+ for _, disk := range disks {
+ rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
+ rackGroups[rackKey] = append(rackGroups[rackKey], disk)
+ }
+
+ var selected []*topology.DiskInfo
+ usedRacks := make(map[string]bool)
+
+ // First pass: select one disk from each rack for maximum diversity
+ for rackKey, rackDisks := range rackGroups {
+ if len(selected) >= shardsNeeded {
+ break
+ }
+
+ // Select best disk from this rack
+ bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
+ if bestDisk != nil {
+ selected = append(selected, bestDisk)
+ usedRacks[rackKey] = true
+ }
+ }
+
+ // Second pass: if we need more disks, select from racks we've already used
+ if len(selected) < shardsNeeded {
+ for _, disk := range disks {
+ if len(selected) >= shardsNeeded {
+ break
+ }
+
+ // Skip if already selected
+ alreadySelected := false
+ for _, sel := range selected {
+ if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
+ alreadySelected = true
+ break
+ }
+ }
+
+ if !alreadySelected && isDiskSuitableForEC(disk) {
+ selected = append(selected, disk)
+ }
+ }
+ }
+
+ return selected
+}
+
+// selectBestFromRack selects the best disk from a rack for EC placement
+func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
+ if len(disks) == 0 {
+ return nil
}
- // Count running EC tasks
- runningCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeErasureCoding {
- runningCount++
+ var bestDisk *topology.DiskInfo
+ bestScore := -1.0
+
+ for _, disk := range disks {
+ if !isDiskSuitableForEC(disk) {
+ continue
+ }
+
+ score := calculateECScore(disk, sourceRack, sourceDC)
+ if score > bestScore {
+ bestScore = score
+ bestDisk = disk
}
}
- // Check concurrency limit
- if runningCount >= ecConfig.MaxConcurrent {
+ return bestDisk
+}
+
+// calculateECScore calculates placement score for EC operations
+func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
+ if disk.DiskInfo == nil {
+ return 0.0
+ }
+
+ score := 0.0
+
+ // Prefer disks with available capacity
+ if disk.DiskInfo.MaxVolumeCount > 0 {
+ utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
+ score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity
+ }
+
+ // Prefer different racks for better distribution
+ if disk.Rack != sourceRack {
+ score += 30.0
+ }
+
+ // Prefer different data centers for better distribution
+ if disk.DataCenter != sourceDC {
+ score += 20.0
+ }
+
+ // Consider current load
+ score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
+
+ return score
+}
+
+// isDiskSuitableForEC checks if a disk is suitable for EC placement
+func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
+ if disk.DiskInfo == nil {
+ return false
+ }
+
+ // Check if disk has capacity
+ if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount {
+ return false
+ }
+
+ // Check if disk is not overloaded
+ if disk.LoadCount > 10 { // Arbitrary threshold
return false
}
- // Check if any worker can handle EC tasks
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeErasureCoding {
- return true
+ return true
+}
+
+// checkECPlacementConflicts checks for placement rule conflicts in EC operations
+func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string {
+ var conflicts []string
+
+ // For EC, being on the same rack as source is often acceptable
+ // but we note it as potential conflict for monitoring
+ if disk.Rack == sourceRack && disk.DataCenter == sourceDC {
+ conflicts = append(conflicts, "same_rack_as_source")
+ }
+
+ return conflicts
+}
+
+// findVolumeReplicas finds all servers that have replicas of the specified volume
+func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
+ if activeTopology == nil {
+ return []string{}
+ }
+
+ topologyInfo := activeTopology.GetTopologyInfo()
+ if topologyInfo == nil {
+ return []string{}
+ }
+
+ var replicaServers []string
+
+ // Iterate through all nodes to find volume replicas
+ for _, dc := range topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, nodeInfo := range rack.DataNodeInfos {
+ for _, diskInfo := range nodeInfo.DiskInfos {
+ for _, volumeInfo := range diskInfo.VolumeInfos {
+ if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
+ replicaServers = append(replicaServers, nodeInfo.Id)
+ break // Found volume on this node, move to next node
+ }
+ }
+ }
}
}
}
- return false
+ return replicaServers
}
diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go
deleted file mode 100644
index 8dc7a1cd0..000000000
--- a/weed/worker/tasks/erasure_coding/ec.go
+++ /dev/null
@@ -1,785 +0,0 @@
-package erasure_coding
-
-import (
- "context"
- "fmt"
- "io"
- "math"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
-)
-
-// Task implements comprehensive erasure coding with protobuf parameters
-type Task struct {
- *base.BaseTypedTask
-
- // Current task state
- sourceServer string
- volumeID uint32
- collection string
- workDir string
- masterClient string
- grpcDialOpt grpc.DialOption
-
- // EC parameters from protobuf
- destinations []*worker_pb.ECDestination // Disk-aware destinations
- existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup
- estimatedShardSize uint64
- dataShards int
- parityShards int
- cleanupSource bool
-
- // Progress tracking
- currentStep string
- stepProgress map[string]float64
-}
-
-// NewTask creates a new erasure coding task
-func NewTask() types.TypedTaskInterface {
- task := &Task{
- BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding),
- masterClient: "localhost:9333", // Default master client
- workDir: "/tmp/seaweedfs_ec_work", // Default work directory
- grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure
- dataShards: erasure_coding.DataShardsCount, // Use package constant
- parityShards: erasure_coding.ParityShardsCount, // Use package constant
- stepProgress: make(map[string]float64),
- }
- return task
-}
-
-// ValidateTyped validates the typed parameters for EC task
-func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error {
- // Basic validation from base class
- if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
- return err
- }
-
- // Check that we have EC-specific parameters
- ecParams := params.GetErasureCodingParams()
- if ecParams == nil {
- return fmt.Errorf("erasure_coding_params is required for EC task")
- }
-
- // Require destinations
- if len(ecParams.Destinations) == 0 {
- return fmt.Errorf("destinations must be specified for EC task")
- }
-
- // DataShards and ParityShards are constants from erasure_coding package
- expectedDataShards := int32(erasure_coding.DataShardsCount)
- expectedParityShards := int32(erasure_coding.ParityShardsCount)
-
- if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards {
- return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards)
- }
- if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards {
- return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards)
- }
-
- // Validate destination count
- destinationCount := len(ecParams.Destinations)
- totalShards := expectedDataShards + expectedParityShards
- if totalShards > int32(destinationCount) {
- return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount)
- }
-
- return nil
-}
-
-// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters
-func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
- baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations
-
- ecParams := params.GetErasureCodingParams()
- if ecParams != nil && ecParams.EstimatedShardSize > 0 {
- // More accurate estimate based on shard size
- // Account for copying, encoding, and distribution
- gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024)
- estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB
- if estimatedTime > baseTime {
- return estimatedTime
- }
- }
-
- return baseTime
-}
-
-// ExecuteTyped implements the actual erasure coding workflow with typed parameters
-func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error {
- // Extract basic parameters
- t.volumeID = params.VolumeId
- t.sourceServer = params.Server
- t.collection = params.Collection
-
- // Extract EC-specific parameters
- ecParams := params.GetErasureCodingParams()
- if ecParams != nil {
- t.destinations = ecParams.Destinations // Store disk-aware destinations
- t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup
- t.estimatedShardSize = ecParams.EstimatedShardSize
- t.cleanupSource = ecParams.CleanupSource
-
- // DataShards and ParityShards are constants, don't override from parameters
- // t.dataShards and t.parityShards are already set to constants in NewTask
-
- if ecParams.WorkingDir != "" {
- t.workDir = ecParams.WorkingDir
- }
- if ecParams.MasterClient != "" {
- t.masterClient = ecParams.MasterClient
- }
- }
-
- // Determine available destinations for logging
- var availableDestinations []string
- for _, dest := range t.destinations {
- availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId))
- }
-
- glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)",
- t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards)
-
- // Create unique working directory for this task
- taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
- if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
- return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
- }
- glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir)
-
- // Ensure cleanup of working directory
- defer func() {
- if err := os.RemoveAll(taskWorkDir); err != nil {
- glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err)
- } else {
- glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir)
- }
- }()
-
- // Step 1: Collect volume locations from master
- glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master")
- t.SetProgress(5.0)
- volumeId := needle.VolumeId(t.volumeID)
- volumeLocations, err := t.collectVolumeLocations(volumeId)
- if err != nil {
- return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err)
- }
- glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations)
-
- // Convert ServerAddress slice to string slice
- var locationStrings []string
- for _, addr := range volumeLocations {
- locationStrings = append(locationStrings, string(addr))
- }
-
- // Step 2: Check if volume has sufficient size for EC encoding
- if !t.shouldPerformECEncoding(locationStrings) {
- glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID)
- t.SetProgress(100.0)
- return nil
- }
-
- // Step 2A: Cleanup existing EC shards if any
- glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID)
- t.SetProgress(10.0)
- err = t.cleanupExistingEcShards()
- if err != nil {
- glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err)
- // Don't fail the task - this is just cleanup
- }
- glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID)
-
- // Step 3: Mark volume readonly on all servers
- glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID)
- t.SetProgress(15.0)
- err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings)
- if err != nil {
- return fmt.Errorf("failed to mark volume readonly: %v", err)
- }
- glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID)
-
- // Step 5: Copy volume files (.dat, .idx) to EC worker
- glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer)
- t.SetProgress(25.0)
- localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
- if err != nil {
- return fmt.Errorf("failed to copy volume files to EC worker: %v", err)
- }
- glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles)
-
- // Step 6: Generate EC shards locally on EC worker
- glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker")
- t.SetProgress(40.0)
- localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir)
- if err != nil {
- return fmt.Errorf("failed to generate EC shards locally: %v", err)
- }
- glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles))
-
- // Step 7: Distribute shards from EC worker to destination servers
- glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers")
- t.SetProgress(60.0)
- err = t.distributeEcShardsFromWorker(localShardFiles)
- if err != nil {
- return fmt.Errorf("failed to distribute EC shards from worker: %v", err)
- }
- glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers")
-
- // Step 8: Mount EC shards on destination servers
- glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers")
- t.SetProgress(80.0)
- err = t.mountEcShardsOnDestinations()
- if err != nil {
- return fmt.Errorf("failed to mount EC shards: %v", err)
- }
- glog.V(1).Infof("WORKFLOW: EC shards mounted successfully")
-
- // Step 9: Delete original volume from all locations
- glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID)
- t.SetProgress(90.0)
- err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings)
- if err != nil {
- return fmt.Errorf("failed to delete original volume: %v", err)
- }
- glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID)
-
- t.SetProgress(100.0)
- glog.Infof("EC task completed successfully for volume %d", t.volumeID)
- return nil
-}
-
-// collectVolumeLocations gets volume location from master (placeholder implementation)
-func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) {
- // For now, return a placeholder implementation
- // Full implementation would call master to get volume locations
- return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil
-}
-
-// cleanupExistingEcShards deletes existing EC shards using planned locations
-func (t *Task) cleanupExistingEcShards() error {
- if len(t.existingShardLocations) == 0 {
- glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID)
- return nil
- }
-
- glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations))
-
- // Delete existing shards from each location using planned shard locations
- for _, location := range t.existingShardLocations {
- if len(location.ShardIds) == 0 {
- continue
- }
-
- glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
-
- err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt,
- func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
- VolumeId: t.volumeID,
- Collection: t.collection,
- ShardIds: location.ShardIds,
- })
- return deleteErr
- })
-
- if err != nil {
- glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err)
- // Continue with other servers - don't fail the entire cleanup
- } else {
- glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
- }
- }
-
- glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID)
- return nil
-}
-
-// shouldPerformECEncoding checks if the volume meets criteria for EC encoding
-func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool {
- // For now, always proceed with EC encoding if volume exists
- // This can be extended with volume size checks, etc.
- return len(volumeLocations) > 0
-}
-
-// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers
-func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error {
- glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations))
-
- // Mark volume readonly on all replica servers
- for _, location := range volumeLocations {
- glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location)
-
- err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
- func(client volume_server_pb.VolumeServerClient) error {
- _, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
- VolumeId: uint32(volumeId),
- })
- return markErr
- })
-
- if err != nil {
- glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err)
- return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err)
- }
-
- glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location)
- }
-
- glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations))
- return nil
-}
-
-// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
-func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
- localFiles := make(map[string]string)
-
- // Copy .dat file
- datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
- err := t.copyFileFromSource(".dat", datFile)
- if err != nil {
- return nil, fmt.Errorf("failed to copy .dat file: %v", err)
- }
- localFiles["dat"] = datFile
- glog.V(1).Infof("Copied .dat file to: %s", datFile)
-
- // Copy .idx file
- idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
- err = t.copyFileFromSource(".idx", idxFile)
- if err != nil {
- return nil, fmt.Errorf("failed to copy .idx file: %v", err)
- }
- localFiles["idx"] = idxFile
- glog.V(1).Infof("Copied .idx file to: %s", idxFile)
-
- return localFiles, nil
-}
-
-// copyFileFromSource copies a file from source server to local path using gRPC streaming
-func (t *Task) copyFileFromSource(ext, localPath string) error {
- return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt,
- func(client volume_server_pb.VolumeServerClient) error {
- stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
- VolumeId: t.volumeID,
- Collection: t.collection,
- Ext: ext,
- StopOffset: uint64(math.MaxInt64),
- })
- if err != nil {
- return fmt.Errorf("failed to initiate file copy: %v", err)
- }
-
- // Create local file
- localFile, err := os.Create(localPath)
- if err != nil {
- return fmt.Errorf("failed to create local file %s: %v", localPath, err)
- }
- defer localFile.Close()
-
- // Stream data and write to local file
- totalBytes := int64(0)
- for {
- resp, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return fmt.Errorf("failed to receive file data: %v", err)
- }
-
- if len(resp.FileContent) > 0 {
- written, writeErr := localFile.Write(resp.FileContent)
- if writeErr != nil {
- return fmt.Errorf("failed to write to local file: %v", writeErr)
- }
- totalBytes += int64(written)
- }
- }
-
- glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath)
- return nil
- })
-}
-
-// generateEcShardsLocally generates EC shards from local volume files
-func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
- datFile := localFiles["dat"]
- idxFile := localFiles["idx"]
-
- if datFile == "" || idxFile == "" {
- return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
- }
-
- // Get base name without extension for EC operations
- baseName := strings.TrimSuffix(datFile, ".dat")
-
- shardFiles := make(map[string]string)
-
- glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
-
- // Generate EC shard files (.ec00 ~ .ec13)
- if err := erasure_coding.WriteEcFiles(baseName); err != nil {
- return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
- }
-
- // Generate .ecx file from .idx
- if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil {
- return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
- }
-
- // Collect generated shard file paths
- for i := 0; i < erasure_coding.TotalShardsCount; i++ {
- shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
- if _, err := os.Stat(shardFile); err == nil {
- shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
- }
- }
-
- // Add metadata files
- ecxFile := idxFile + ".ecx"
- if _, err := os.Stat(ecxFile); err == nil {
- shardFiles["ecx"] = ecxFile
- }
-
- // Generate .vif file (volume info)
- vifFile := baseName + ".vif"
- // Create basic volume info - in a real implementation, this would come from the original volume
- volumeInfo := &volume_server_pb.VolumeInfo{
- Version: uint32(needle.GetCurrentVersion()),
- }
- if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
- glog.Warningf("Failed to create .vif file: %v", err)
- } else {
- shardFiles["vif"] = vifFile
- }
-
- glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
- return shardFiles, nil
-}
-
-func (t *Task) copyEcShardsToDestinations() error {
- if len(t.destinations) == 0 {
- return fmt.Errorf("no destinations specified for EC shard distribution")
- }
-
- destinations := t.destinations
-
- glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations))
-
- // Prepare shard IDs (0-13 for EC shards)
- var shardIds []uint32
- for i := 0; i < erasure_coding.TotalShardsCount; i++ {
- shardIds = append(shardIds, uint32(i))
- }
-
- // Distribute shards across destinations
- var wg sync.WaitGroup
- errorChan := make(chan error, len(destinations))
-
- // Track which disks have already received metadata files (server+disk)
- metadataFilesCopied := make(map[string]bool)
- var metadataMutex sync.Mutex
-
- // For each destination, copy a subset of shards
- shardsPerDest := len(shardIds) / len(destinations)
- remainder := len(shardIds) % len(destinations)
-
- shardOffset := 0
- for i, dest := range destinations {
- wg.Add(1)
-
- shardsForThisDest := shardsPerDest
- if i < remainder {
- shardsForThisDest++ // Distribute remainder shards
- }
-
- destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
- shardOffset += shardsForThisDest
-
- go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
- defer wg.Done()
-
- if t.IsCancelled() {
- errorChan <- fmt.Errorf("task cancelled during shard copy")
- return
- }
-
- // Create disk-specific metadata key (server+disk)
- diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
-
- glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)",
- targetShardIds, t.sourceServer, destination.Node, destination.DiskId)
-
- // Check if this disk needs metadata files (only once per disk)
- metadataMutex.Lock()
- needsMetadataFiles := !metadataFilesCopied[diskKey]
- if needsMetadataFiles {
- metadataFilesCopied[diskKey] = true
- }
- metadataMutex.Unlock()
-
- err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
- func(client volume_server_pb.VolumeServerClient) error {
- _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
- VolumeId: uint32(t.volumeID),
- Collection: t.collection,
- ShardIds: targetShardIds,
- CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk
- CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk
- CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk
- SourceDataNode: t.sourceServer,
- DiskId: destination.DiskId, // Pass target disk ID
- })
- return copyErr
- })
-
- if err != nil {
- errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
- return
- }
-
- if needsMetadataFiles {
- glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d",
- targetShardIds, destination.Node, destination.DiskId)
- } else {
- glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)",
- targetShardIds, destination.Node, destination.DiskId)
- }
- }(dest, destShardIds)
- }
-
- wg.Wait()
- close(errorChan)
-
- // Check for any copy errors
- if err := <-errorChan; err != nil {
- return err
- }
-
- glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID)
- return nil
-}
-
-// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers
-func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error {
- if len(t.destinations) == 0 {
- return fmt.Errorf("no destinations specified for EC shard distribution")
- }
-
- destinations := t.destinations
-
- glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations))
-
- // Prepare shard IDs (0-13 for EC shards)
- var shardIds []uint32
- for i := 0; i < erasure_coding.TotalShardsCount; i++ {
- shardIds = append(shardIds, uint32(i))
- }
-
- // Distribute shards across destinations
- var wg sync.WaitGroup
- errorChan := make(chan error, len(destinations))
-
- // Track which disks have already received metadata files (server+disk)
- metadataFilesCopied := make(map[string]bool)
- var metadataMutex sync.Mutex
-
- // For each destination, send a subset of shards
- shardsPerDest := len(shardIds) / len(destinations)
- remainder := len(shardIds) % len(destinations)
-
- shardOffset := 0
- for i, dest := range destinations {
- wg.Add(1)
-
- shardsForThisDest := shardsPerDest
- if i < remainder {
- shardsForThisDest++ // Distribute remainder shards
- }
-
- destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
- shardOffset += shardsForThisDest
-
- go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
- defer wg.Done()
-
- if t.IsCancelled() {
- errorChan <- fmt.Errorf("task cancelled during shard distribution")
- return
- }
-
- // Create disk-specific metadata key (server+disk)
- diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
-
- glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)",
- targetShardIds, destination.Node, destination.DiskId)
-
- // Check if this disk needs metadata files (only once per disk)
- metadataMutex.Lock()
- needsMetadataFiles := !metadataFilesCopied[diskKey]
- if needsMetadataFiles {
- metadataFilesCopied[diskKey] = true
- }
- metadataMutex.Unlock()
-
- // Send shard files to destination using HTTP upload (simplified for now)
- err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles)
- if err != nil {
- errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
- return
- }
-
- if needsMetadataFiles {
- glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d",
- targetShardIds, destination.Node, destination.DiskId)
- } else {
- glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)",
- targetShardIds, destination.Node, destination.DiskId)
- }
- }(dest, destShardIds)
- }
-
- wg.Wait()
- close(errorChan)
-
- // Check for any distribution errors
- if err := <-errorChan; err != nil {
- return err
- }
-
- glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID)
- return nil
-}
-
-// sendShardsToDestination sends specific shard files from worker to a destination server (simplified)
-func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error {
- // For now, use a simplified approach - just upload the files
- // In a full implementation, this would use proper file upload mechanisms
- glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId)
-
- // TODO: Implement actual file upload to volume server
- // This is a placeholder - actual implementation would:
- // 1. Open each shard file locally
- // 2. Upload via HTTP POST or gRPC stream to destination volume server
- // 3. Volume server would save to the specified disk_id
-
- return nil
-}
-
-// mountEcShardsOnDestinations mounts EC shards on all destination servers
-func (t *Task) mountEcShardsOnDestinations() error {
- if len(t.destinations) == 0 {
- return fmt.Errorf("no destinations specified for mounting EC shards")
- }
-
- destinations := t.destinations
-
- glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations))
-
- // Prepare all shard IDs (0-13)
- var allShardIds []uint32
- for i := 0; i < erasure_coding.TotalShardsCount; i++ {
- allShardIds = append(allShardIds, uint32(i))
- }
-
- var wg sync.WaitGroup
- errorChan := make(chan error, len(destinations))
-
- // Mount shards on each destination server
- for _, dest := range destinations {
- wg.Add(1)
-
- go func(destination *worker_pb.ECDestination) {
- defer wg.Done()
-
- if t.IsCancelled() {
- errorChan <- fmt.Errorf("task cancelled during shard mounting")
- return
- }
-
- glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId)
-
- err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
- func(client volume_server_pb.VolumeServerClient) error {
- _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
- VolumeId: uint32(t.volumeID),
- Collection: t.collection,
- ShardIds: allShardIds, // Mount all available shards on each server
- })
- return mountErr
- })
-
- if err != nil {
- // It's normal for some servers to not have all shards, so log as warning rather than error
- glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err)
- } else {
- glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId)
- }
- }(dest)
- }
-
- wg.Wait()
- close(errorChan)
-
- // Check for any critical mounting errors
- select {
- case err := <-errorChan:
- if err != nil {
- glog.Warningf("Some shard mounting issues occurred: %v", err)
- }
- default:
- // No errors
- }
-
- glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID)
- return nil
-}
-
-// deleteVolumeFromAllLocations deletes the original volume from all replica servers
-func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error {
- glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations))
-
- for _, location := range volumeLocations {
- glog.V(1).Infof("Deleting volume %d from %s", volumeId, location)
-
- err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
- func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
- VolumeId: uint32(volumeId),
- OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards
- })
- return deleteErr
- })
-
- if err != nil {
- glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err)
- return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err)
- }
-
- glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location)
- }
-
- glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations))
- return nil
-}
-
-// Register the task in the global registry
-func init() {
- types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask)
- glog.V(1).Infof("Registered EC task")
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go
new file mode 100644
index 000000000..a6a3f749f
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ec_task.go
@@ -0,0 +1,660 @@
+package erasure_coding
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types/base"
+ "google.golang.org/grpc"
+)
+
+// ErasureCodingTask implements the Task interface
+type ErasureCodingTask struct {
+ *base.BaseTask
+ server string
+ volumeID uint32
+ collection string
+ workDir string
+ progress float64
+
+ // EC parameters
+ dataShards int32
+ parityShards int32
+ destinations []*worker_pb.ECDestination
+ shardAssignment map[string][]string // destination -> assigned shard types
+ replicas []string // volume replica servers for deletion
+}
+
+// NewErasureCodingTask creates a new unified EC task instance
+func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask {
+ return &ErasureCodingTask{
+ BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding),
+ server: server,
+ volumeID: volumeID,
+ collection: collection,
+ dataShards: erasure_coding.DataShardsCount, // Default values
+ parityShards: erasure_coding.ParityShardsCount, // Default values
+ }
+}
+
+// Execute implements the UnifiedTask interface
+func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
+ if params == nil {
+ return fmt.Errorf("task parameters are required")
+ }
+
+ ecParams := params.GetErasureCodingParams()
+ if ecParams == nil {
+ return fmt.Errorf("erasure coding parameters are required")
+ }
+
+ t.dataShards = ecParams.DataShards
+ t.parityShards = ecParams.ParityShards
+ t.workDir = ecParams.WorkingDir
+ t.destinations = ecParams.Destinations
+ t.replicas = params.Replicas // Get replicas from task parameters
+
+ t.GetLogger().WithFields(map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ "data_shards": t.dataShards,
+ "parity_shards": t.parityShards,
+ "destinations": len(t.destinations),
+ }).Info("Starting erasure coding task")
+
+ // Use the working directory from task parameters, or fall back to a default
+ baseWorkDir := t.workDir
+ if baseWorkDir == "" {
+ baseWorkDir = "/tmp/seaweedfs_ec_work"
+ }
+
+ // Create unique working directory for this task
+ taskWorkDir := filepath.Join(baseWorkDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
+ if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
+ return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
+ }
+ glog.V(1).Infof("Created working directory: %s", taskWorkDir)
+
+ // Update the task's working directory to the specific instance directory
+ t.workDir = taskWorkDir
+ glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir)
+
+ // Ensure cleanup of working directory (but preserve logs)
+ defer func() {
+ // Clean up volume files and EC shards, but preserve the directory structure and any logs
+ patterns := []string{"*.dat", "*.idx", "*.ec*", "*.vif"}
+ for _, pattern := range patterns {
+ matches, err := filepath.Glob(filepath.Join(taskWorkDir, pattern))
+ if err != nil {
+ continue
+ }
+ for _, match := range matches {
+ if err := os.Remove(match); err != nil {
+ glog.V(2).Infof("Could not remove %s: %v", match, err)
+ }
+ }
+ }
+ glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir)
+ }()
+
+ // Step 1: Mark volume readonly
+ t.ReportProgress(10.0)
+ t.GetLogger().Info("Marking volume readonly")
+ if err := t.markVolumeReadonly(); err != nil {
+ return fmt.Errorf("failed to mark volume readonly: %v", err)
+ }
+
+ // Step 2: Copy volume files to worker
+ t.ReportProgress(25.0)
+ t.GetLogger().Info("Copying volume files to worker")
+ localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to copy volume files: %v", err)
+ }
+
+ // Step 3: Generate EC shards locally
+ t.ReportProgress(40.0)
+ t.GetLogger().Info("Generating EC shards locally")
+ shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to generate EC shards: %v", err)
+ }
+
+ // Step 4: Distribute shards to destinations
+ t.ReportProgress(60.0)
+ t.GetLogger().Info("Distributing EC shards to destinations")
+ if err := t.distributeEcShards(shardFiles); err != nil {
+ return fmt.Errorf("failed to distribute EC shards: %v", err)
+ }
+
+ // Step 5: Mount EC shards
+ t.ReportProgress(80.0)
+ t.GetLogger().Info("Mounting EC shards")
+ if err := t.mountEcShards(); err != nil {
+ return fmt.Errorf("failed to mount EC shards: %v", err)
+ }
+
+ // Step 6: Delete original volume
+ t.ReportProgress(90.0)
+ t.GetLogger().Info("Deleting original volume")
+ if err := t.deleteOriginalVolume(); err != nil {
+ return fmt.Errorf("failed to delete original volume: %v", err)
+ }
+
+ t.ReportProgress(100.0)
+ glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed",
+ t.volumeID, t.server, len(shardFiles))
+
+ return nil
+}
+
+// Validate implements the UnifiedTask interface
+func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
+ if params == nil {
+ return fmt.Errorf("task parameters are required")
+ }
+
+ ecParams := params.GetErasureCodingParams()
+ if ecParams == nil {
+ return fmt.Errorf("erasure coding parameters are required")
+ }
+
+ if params.VolumeId != t.volumeID {
+ return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
+ }
+
+ if params.Server != t.server {
+ return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server)
+ }
+
+ if ecParams.DataShards < 1 {
+ return fmt.Errorf("invalid data shards: %d (must be >= 1)", ecParams.DataShards)
+ }
+
+ if ecParams.ParityShards < 1 {
+ return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards)
+ }
+
+ if len(ecParams.Destinations) < int(ecParams.DataShards+ecParams.ParityShards) {
+ return fmt.Errorf("insufficient destinations: got %d, need %d", len(ecParams.Destinations), ecParams.DataShards+ecParams.ParityShards)
+ }
+
+ return nil
+}
+
+// EstimateTime implements the UnifiedTask interface
+func (t *ErasureCodingTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
+ // Basic estimate based on simulated steps
+ return 20 * time.Second // Sum of all step durations
+}
+
+// GetProgress returns current progress
+func (t *ErasureCodingTask) GetProgress() float64 {
+ return t.progress
+}
+
+// Helper methods for actual EC operations
+
+// markVolumeReadonly marks the volume as readonly on the source server
+func (t *ErasureCodingTask) markVolumeReadonly() error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: t.volumeID,
+ })
+ return err
+ })
+}
+
+// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
+func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
+ localFiles := make(map[string]string)
+
+ // Copy .dat file
+ datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
+ if err := t.copyFileFromSource(".dat", datFile); err != nil {
+ return nil, fmt.Errorf("failed to copy .dat file: %v", err)
+ }
+ localFiles["dat"] = datFile
+
+ // Copy .idx file
+ idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
+ if err := t.copyFileFromSource(".idx", idxFile); err != nil {
+ return nil, fmt.Errorf("failed to copy .idx file: %v", err)
+ }
+ localFiles["idx"] = idxFile
+
+ return localFiles, nil
+}
+
+// copyFileFromSource copies a file from source server to local path using gRPC streaming
+func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ Ext: ext,
+ StopOffset: uint64(math.MaxInt64),
+ })
+ if err != nil {
+ return fmt.Errorf("failed to initiate file copy: %v", err)
+ }
+
+ // Create local file
+ localFile, err := os.Create(localPath)
+ if err != nil {
+ return fmt.Errorf("failed to create local file %s: %v", localPath, err)
+ }
+ defer localFile.Close()
+
+ // Stream data and write to local file
+ totalBytes := int64(0)
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("failed to receive file data: %v", err)
+ }
+
+ if len(resp.FileContent) > 0 {
+ written, writeErr := localFile.Write(resp.FileContent)
+ if writeErr != nil {
+ return fmt.Errorf("failed to write to local file: %v", writeErr)
+ }
+ totalBytes += int64(written)
+ }
+ }
+
+ glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath)
+ return nil
+ })
+}
+
+// generateEcShardsLocally generates EC shards from local volume files
+func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
+ datFile := localFiles["dat"]
+ idxFile := localFiles["idx"]
+
+ if datFile == "" || idxFile == "" {
+ return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
+ }
+
+ // Get base name without extension for EC operations
+ baseName := strings.TrimSuffix(datFile, ".dat")
+ shardFiles := make(map[string]string)
+
+ glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
+
+ // Generate EC shard files (.ec00 ~ .ec13)
+ if err := erasure_coding.WriteEcFiles(baseName); err != nil {
+ return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
+ }
+
+ // Generate .ecx file from .idx (use baseName, not full idx path)
+ if err := erasure_coding.WriteSortedFileFromIdx(baseName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
+ }
+
+ // Collect generated shard file paths
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
+ if _, err := os.Stat(shardFile); err == nil {
+ shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
+ }
+ }
+
+ // Add metadata files
+ ecxFile := baseName + ".ecx"
+ if _, err := os.Stat(ecxFile); err == nil {
+ shardFiles["ecx"] = ecxFile
+ }
+
+ // Generate .vif file (volume info)
+ vifFile := baseName + ".vif"
+ volumeInfo := &volume_server_pb.VolumeInfo{
+ Version: uint32(needle.GetCurrentVersion()),
+ }
+ if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
+ glog.Warningf("Failed to create .vif file: %v", err)
+ } else {
+ shardFiles["vif"] = vifFile
+ }
+
+ glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
+ return shardFiles, nil
+}
+
+// distributeEcShards distributes locally generated EC shards to destination servers
+func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ if len(shardFiles) == 0 {
+ return fmt.Errorf("no shard files available for distribution")
+ }
+
+ // Create shard assignment: assign specific shards to specific destinations
+ shardAssignment := t.createShardAssignment(shardFiles)
+ if len(shardAssignment) == 0 {
+ return fmt.Errorf("failed to create shard assignment")
+ }
+
+ // Store assignment for use during mounting
+ t.shardAssignment = shardAssignment
+
+ // Send assigned shards to each destination
+ for destNode, assignedShards := range shardAssignment {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "assigned_shards": len(assignedShards),
+ "shard_ids": assignedShards,
+ }).Info("Distributing assigned EC shards to destination")
+
+ // Send only the assigned shards to this destination
+ for _, shardType := range assignedShards {
+ filePath, exists := shardFiles[shardType]
+ if !exists {
+ return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
+ }
+
+ if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil {
+ return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err)
+ }
+ }
+ }
+
+ glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
+ return nil
+}
+
+// createShardAssignment assigns specific EC shards to specific destination servers
+// Each destination gets a subset of shards based on availability and placement rules
+func (t *ErasureCodingTask) createShardAssignment(shardFiles map[string]string) map[string][]string {
+ assignment := make(map[string][]string)
+
+ // Collect all available EC shards (ec00-ec13)
+ var availableShards []string
+ for shardType := range shardFiles {
+ if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
+ availableShards = append(availableShards, shardType)
+ }
+ }
+
+ // Sort shards for consistent assignment
+ sort.Strings(availableShards)
+
+ if len(availableShards) == 0 {
+ glog.Warningf("No EC shards found for assignment")
+ return assignment
+ }
+
+ // Calculate shards per destination
+ numDestinations := len(t.destinations)
+ if numDestinations == 0 {
+ return assignment
+ }
+
+ // Strategy: Distribute shards as evenly as possible across destinations
+ // With 14 shards and N destinations, some destinations get ⌈14/NāŒ‰ shards, others get ⌊14/NāŒ‹
+ shardsPerDest := len(availableShards) / numDestinations
+ extraShards := len(availableShards) % numDestinations
+
+ shardIndex := 0
+ for i, dest := range t.destinations {
+ var destShards []string
+
+ // Assign base number of shards
+ shardsToAssign := shardsPerDest
+
+ // Assign one extra shard to first 'extraShards' destinations
+ if i < extraShards {
+ shardsToAssign++
+ }
+
+ // Assign the shards
+ for j := 0; j < shardsToAssign && shardIndex < len(availableShards); j++ {
+ destShards = append(destShards, availableShards[shardIndex])
+ shardIndex++
+ }
+
+ assignment[dest.Node] = destShards
+
+ glog.V(2).Infof("Assigned shards %v to destination %s", destShards, dest.Node)
+ }
+
+ // Assign metadata files (.ecx, .vif) to each destination that has shards
+ // Note: .ecj files are created during mount, not during initial generation
+ for destNode, destShards := range assignment {
+ if len(destShards) > 0 {
+ // Add .ecx file if available
+ if _, hasEcx := shardFiles["ecx"]; hasEcx {
+ assignment[destNode] = append(assignment[destNode], "ecx")
+ }
+
+ // Add .vif file if available
+ if _, hasVif := shardFiles["vif"]; hasVif {
+ assignment[destNode] = append(assignment[destNode], "vif")
+ }
+
+ glog.V(2).Infof("Assigned metadata files (.ecx, .vif) to destination %s", destNode)
+ }
+ }
+
+ return assignment
+}
+
+// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API
+func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ // Open the local shard file
+ file, err := os.Open(filePath)
+ if err != nil {
+ return fmt.Errorf("failed to open shard file %s: %v", filePath, err)
+ }
+ defer file.Close()
+
+ // Get file size
+ fileInfo, err := file.Stat()
+ if err != nil {
+ return fmt.Errorf("failed to get file info for %s: %v", filePath, err)
+ }
+
+ // Determine file extension and shard ID
+ var ext string
+ var shardId uint32
+ if shardType == "ecx" {
+ ext = ".ecx"
+ shardId = 0 // ecx file doesn't have a specific shard ID
+ } else if shardType == "vif" {
+ ext = ".vif"
+ shardId = 0 // vif file doesn't have a specific shard ID
+ } else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
+ // EC shard file like "ec00", "ec01", etc.
+ ext = "." + shardType
+ fmt.Sscanf(shardType[2:], "%d", &shardId)
+ } else {
+ return fmt.Errorf("unknown shard type: %s", shardType)
+ }
+
+ // Create streaming client
+ stream, err := client.ReceiveFile(context.Background())
+ if err != nil {
+ return fmt.Errorf("failed to create receive stream: %v", err)
+ }
+
+ // Send file info first
+ err = stream.Send(&volume_server_pb.ReceiveFileRequest{
+ Data: &volume_server_pb.ReceiveFileRequest_Info{
+ Info: &volume_server_pb.ReceiveFileInfo{
+ VolumeId: t.volumeID,
+ Ext: ext,
+ Collection: t.collection,
+ IsEcVolume: true,
+ ShardId: shardId,
+ FileSize: uint64(fileInfo.Size()),
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to send file info: %v", err)
+ }
+
+ // Send file content in chunks
+ buffer := make([]byte, 64*1024) // 64KB chunks
+ for {
+ n, readErr := file.Read(buffer)
+ if n > 0 {
+ err = stream.Send(&volume_server_pb.ReceiveFileRequest{
+ Data: &volume_server_pb.ReceiveFileRequest_FileContent{
+ FileContent: buffer[:n],
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to send file content: %v", err)
+ }
+ }
+ if readErr == io.EOF {
+ break
+ }
+ if readErr != nil {
+ return fmt.Errorf("failed to read file: %v", readErr)
+ }
+ }
+
+ // Close stream and get response
+ resp, err := stream.CloseAndRecv()
+ if err != nil {
+ return fmt.Errorf("failed to close stream: %v", err)
+ }
+
+ if resp.Error != "" {
+ return fmt.Errorf("server error: %s", resp.Error)
+ }
+
+ glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer)
+ return nil
+ })
+}
+
+// mountEcShards mounts EC shards on destination servers
+func (t *ErasureCodingTask) mountEcShards() error {
+ if t.shardAssignment == nil {
+ return fmt.Errorf("shard assignment not available for mounting")
+ }
+
+ // Mount only assigned shards on each destination
+ for destNode, assignedShards := range t.shardAssignment {
+ // Convert shard names to shard IDs for mounting
+ var shardIds []uint32
+ for _, shardType := range assignedShards {
+ // Skip metadata files (.ecx, .vif) - only mount EC shards
+ if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
+ // Parse shard ID from "ec00", "ec01", etc.
+ var shardId uint32
+ if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil {
+ shardIds = append(shardIds, shardId)
+ }
+ }
+ }
+
+ if len(shardIds) == 0 {
+ glog.V(1).Infof("No EC shards to mount on %s (only metadata files)", destNode)
+ continue
+ }
+
+ glog.V(1).Infof("Mounting shards %v on %s", shardIds, destNode)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ ShardIds: shardIds,
+ })
+ return mountErr
+ })
+
+ if err != nil {
+ glog.Warningf("Failed to mount shards %v on %s: %v", shardIds, destNode, err)
+ } else {
+ glog.V(1).Infof("Successfully mounted EC shards %v on %s", shardIds, destNode)
+ }
+ }
+
+ return nil
+}
+
+// deleteOriginalVolume deletes the original volume and all its replicas from all servers
+func (t *ErasureCodingTask) deleteOriginalVolume() error {
+ // Get replicas from task parameters (set during detection)
+ replicas := t.getReplicas()
+
+ if len(replicas) == 0 {
+ glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID)
+ replicas = []string{t.server}
+ }
+
+ glog.V(1).Infof("Deleting volume %d from %d replica servers: %v", t.volumeID, len(replicas), replicas)
+
+ // Delete volume from all replica locations
+ var deleteErrors []string
+ successCount := 0
+
+ for _, replicaServer := range replicas {
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
+ VolumeId: t.volumeID,
+ OnlyEmpty: false, // Force delete since we've created EC shards
+ })
+ return err
+ })
+
+ if err != nil {
+ deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err))
+ glog.Warningf("Failed to delete volume %d from replica server %s: %v", t.volumeID, replicaServer, err)
+ } else {
+ successCount++
+ glog.V(1).Infof("Successfully deleted volume %d from replica server %s", t.volumeID, replicaServer)
+ }
+ }
+
+ // Report results
+ if len(deleteErrors) > 0 {
+ glog.Warningf("Some volume deletions failed (%d/%d successful): %v", successCount, len(replicas), deleteErrors)
+ // Don't return error - EC task should still be considered successful if shards are mounted
+ } else {
+ glog.V(1).Infof("Successfully deleted volume %d from all %d replica servers", t.volumeID, len(replicas))
+ }
+
+ return nil
+}
+
+// getReplicas extracts replica servers from task parameters
+func (t *ErasureCodingTask) getReplicas() []string {
+ // Access replicas from the parameters passed during Execute
+ // We'll need to store these during Execute - let me add a field to the task
+ return t.replicas
+}
diff --git a/weed/worker/tasks/erasure_coding/monitoring.go b/weed/worker/tasks/erasure_coding/monitoring.go
new file mode 100644
index 000000000..799eb62c8
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/monitoring.go
@@ -0,0 +1,229 @@
+package erasure_coding
+
+import (
+ "sync"
+ "time"
+)
+
+// ErasureCodingMetrics contains erasure coding-specific monitoring data
+type ErasureCodingMetrics struct {
+ // Execution metrics
+ VolumesEncoded int64 `json:"volumes_encoded"`
+ TotalShardsCreated int64 `json:"total_shards_created"`
+ TotalDataProcessed int64 `json:"total_data_processed"`
+ TotalSourcesRemoved int64 `json:"total_sources_removed"`
+ LastEncodingTime time.Time `json:"last_encoding_time"`
+
+ // Performance metrics
+ AverageEncodingTime int64 `json:"average_encoding_time_seconds"`
+ AverageShardSize int64 `json:"average_shard_size"`
+ AverageDataShards int `json:"average_data_shards"`
+ AverageParityShards int `json:"average_parity_shards"`
+ SuccessfulOperations int64 `json:"successful_operations"`
+ FailedOperations int64 `json:"failed_operations"`
+
+ // Distribution metrics
+ ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"`
+ ShardsPerRack map[string]int64 `json:"shards_per_rack"`
+ PlacementSuccessRate float64 `json:"placement_success_rate"`
+
+ // Current task metrics
+ CurrentVolumeSize int64 `json:"current_volume_size"`
+ CurrentShardCount int `json:"current_shard_count"`
+ VolumesPendingEncoding int `json:"volumes_pending_encoding"`
+
+ mutex sync.RWMutex
+}
+
+// NewErasureCodingMetrics creates a new erasure coding metrics instance
+func NewErasureCodingMetrics() *ErasureCodingMetrics {
+ return &ErasureCodingMetrics{
+ LastEncodingTime: time.Now(),
+ ShardsPerDataCenter: make(map[string]int64),
+ ShardsPerRack: make(map[string]int64),
+ }
+}
+
+// RecordVolumeEncoded records a successful volume encoding operation
+func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.VolumesEncoded++
+ m.TotalShardsCreated += int64(shardsCreated)
+ m.TotalDataProcessed += volumeSize
+ m.SuccessfulOperations++
+ m.LastEncodingTime = time.Now()
+
+ if sourceRemoved {
+ m.TotalSourcesRemoved++
+ }
+
+ // Update average encoding time
+ if m.AverageEncodingTime == 0 {
+ m.AverageEncodingTime = int64(encodingTime.Seconds())
+ } else {
+ // Exponential moving average
+ newTime := int64(encodingTime.Seconds())
+ m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
+ }
+
+ // Update average shard size
+ if shardsCreated > 0 {
+ avgShardSize := volumeSize / int64(shardsCreated)
+ if m.AverageShardSize == 0 {
+ m.AverageShardSize = avgShardSize
+ } else {
+ m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
+ }
+ }
+
+ // Update average data/parity shards
+ if m.AverageDataShards == 0 {
+ m.AverageDataShards = dataShards
+ m.AverageParityShards = parityShards
+ } else {
+ m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
+ m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
+ }
+}
+
+// RecordFailure records a failed erasure coding operation
+func (m *ErasureCodingMetrics) RecordFailure() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.FailedOperations++
+}
+
+// RecordShardPlacement records shard placement for distribution tracking
+func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.ShardsPerDataCenter[dataCenter]++
+ rackKey := dataCenter + ":" + rack
+ m.ShardsPerRack[rackKey]++
+}
+
+// UpdateCurrentVolumeInfo updates current volume processing information
+func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.CurrentVolumeSize = volumeSize
+ m.CurrentShardCount = shardCount
+}
+
+// SetVolumesPendingEncoding sets the number of volumes pending encoding
+func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.VolumesPendingEncoding = count
+}
+
+// UpdatePlacementSuccessRate updates the placement success rate
+func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ if m.PlacementSuccessRate == 0 {
+ m.PlacementSuccessRate = rate
+ } else {
+ // Exponential moving average
+ m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
+ }
+}
+
+// GetMetrics returns a copy of the current metrics (without the mutex)
+func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ // Create deep copy of maps
+ shardsPerDC := make(map[string]int64)
+ for k, v := range m.ShardsPerDataCenter {
+ shardsPerDC[k] = v
+ }
+
+ shardsPerRack := make(map[string]int64)
+ for k, v := range m.ShardsPerRack {
+ shardsPerRack[k] = v
+ }
+
+ // Create a copy without the mutex to avoid copying lock value
+ return ErasureCodingMetrics{
+ VolumesEncoded: m.VolumesEncoded,
+ TotalShardsCreated: m.TotalShardsCreated,
+ TotalDataProcessed: m.TotalDataProcessed,
+ TotalSourcesRemoved: m.TotalSourcesRemoved,
+ LastEncodingTime: m.LastEncodingTime,
+ AverageEncodingTime: m.AverageEncodingTime,
+ AverageShardSize: m.AverageShardSize,
+ AverageDataShards: m.AverageDataShards,
+ AverageParityShards: m.AverageParityShards,
+ SuccessfulOperations: m.SuccessfulOperations,
+ FailedOperations: m.FailedOperations,
+ ShardsPerDataCenter: shardsPerDC,
+ ShardsPerRack: shardsPerRack,
+ PlacementSuccessRate: m.PlacementSuccessRate,
+ CurrentVolumeSize: m.CurrentVolumeSize,
+ CurrentShardCount: m.CurrentShardCount,
+ VolumesPendingEncoding: m.VolumesPendingEncoding,
+ }
+}
+
+// GetSuccessRate returns the success rate as a percentage
+func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ total := m.SuccessfulOperations + m.FailedOperations
+ if total == 0 {
+ return 100.0
+ }
+ return float64(m.SuccessfulOperations) / float64(total) * 100.0
+}
+
+// GetAverageDataProcessed returns the average data processed per volume
+func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.VolumesEncoded == 0 {
+ return 0
+ }
+ return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
+}
+
+// GetSourceRemovalRate returns the percentage of sources removed after encoding
+func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ if m.VolumesEncoded == 0 {
+ return 0
+ }
+ return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
+}
+
+// Reset resets all metrics to zero
+func (m *ErasureCodingMetrics) Reset() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ *m = ErasureCodingMetrics{
+ LastEncodingTime: time.Now(),
+ ShardsPerDataCenter: make(map[string]int64),
+ ShardsPerRack: make(map[string]int64),
+ }
+}
+
+// Global metrics instance for erasure coding tasks
+var globalErasureCodingMetrics = NewErasureCodingMetrics()
+
+// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
+func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
+ return globalErasureCodingMetrics
+}
diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/register.go
index 62cfe6b56..883aaf965 100644
--- a/weed/worker/tasks/erasure_coding/ec_register.go
+++ b/weed/worker/tasks/erasure_coding/register.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
@@ -35,9 +36,19 @@ func RegisterErasureCodingTask() {
Icon: "fas fa-shield-alt text-success",
Capabilities: []string{"erasure_coding", "data_protection"},
- Config: config,
- ConfigSpec: GetConfigSpec(),
- CreateTask: nil, // Uses typed task system - see init() in ec.go
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
+ if params == nil {
+ return nil, fmt.Errorf("task parameters are required")
+ }
+ return NewErasureCodingTask(
+ fmt.Sprintf("erasure_coding-%d", params.VolumeId),
+ params.Server,
+ params.VolumeId,
+ params.Collection,
+ ), nil
+ },
DetectionFunc: Detection,
ScanInterval: 1 * time.Hour,
SchedulingFunc: Scheduling,
diff --git a/weed/worker/tasks/erasure_coding/scheduling.go b/weed/worker/tasks/erasure_coding/scheduling.go
new file mode 100644
index 000000000..d9d891e04
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/scheduling.go
@@ -0,0 +1,40 @@
+package erasure_coding
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Scheduling implements the scheduling logic for erasure coding tasks
+func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
+ ecConfig := config.(*Config)
+
+ // Check if we have available workers
+ if len(availableWorkers) == 0 {
+ return false
+ }
+
+ // Count running EC tasks
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeErasureCoding {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningCount >= ecConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if any worker can handle EC tasks
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeErasureCoding {
+ return true
+ }
+ }
+ }
+
+ return false
+}