diff options
Diffstat (limited to 'weed/worker/tasks/erasure_coding')
| -rw-r--r-- | weed/worker/tasks/erasure_coding/config.go | 207 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/detection.go | 140 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec.go | 792 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec_detector.go | 139 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec_register.go | 109 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec_scheduler.go | 114 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ui.go | 309 |
7 files changed, 1146 insertions, 664 deletions
diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go new file mode 100644 index 000000000..1f70fb8db --- /dev/null +++ b/weed/worker/tasks/erasure_coding/config.go @@ -0,0 +1,207 @@ +package erasure_coding + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with erasure coding specific settings +type Config struct { + base.BaseConfig + QuietForSeconds int `json:"quiet_for_seconds"` + FullnessRatio float64 `json:"fullness_ratio"` + CollectionFilter string `json:"collection_filter"` + MinSizeMB int `json:"min_size_mb"` +} + +// NewDefaultConfig creates a new default erasure coding configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 60 * 60, // 1 hour + MaxConcurrent: 1, + }, + QuietForSeconds: 300, // 5 minutes + FullnessRatio: 0.8, // 80% + CollectionFilter: "", + MinSizeMB: 30, // 30MB (more reasonable than 100MB) + } +} + +// GetConfigSpec returns the configuration schema for erasure coding tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Erasure Coding Tasks", + Description: "Whether erasure coding tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic erasure coding task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing erasure coding", + HelpText: "The system will check for volumes that need erasure coding at this interval", + Placeholder: "1", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 5, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of erasure coding tasks that can run simultaneously", + HelpText: "Limits the number of erasure coding operations running at the same time", + Placeholder: "1 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "quiet_for_seconds", + JSONName: "quiet_for_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 300, + MinValue: 60, + MaxValue: 3600, + Required: true, + DisplayName: "Quiet Period", + Description: "Minimum time volume must be quiet before erasure coding", + HelpText: "Volume must not be modified for this duration before erasure coding", + Placeholder: "5", + Unit: config.UnitMinutes, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "fullness_ratio", + JSONName: "fullness_ratio", + Type: config.FieldTypeFloat, + DefaultValue: 0.8, + MinValue: 0.1, + MaxValue: 1.0, + Required: true, + DisplayName: "Fullness Ratio", + Description: "Minimum fullness ratio to trigger erasure coding", + HelpText: "Only volumes with this fullness ratio or higher will be erasure coded", + Placeholder: "0.80 (80%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "collection_filter", + JSONName: "collection_filter", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Collection Filter", + Description: "Only process volumes from specific collections", + HelpText: "Leave empty to process all collections, or specify collection name", + Placeholder: "my_collection", + InputType: "text", + CSSClasses: "form-control", + }, + { + Name: "min_size_mb", + JSONName: "min_size_mb", + Type: config.FieldTypeInt, + DefaultValue: 30, + MinValue: 1, + MaxValue: 1000, + Required: true, + DisplayName: "Minimum Size (MB)", + Description: "Minimum volume size to consider for erasure coding", + HelpText: "Only volumes larger than this size will be considered for erasure coding", + Placeholder: "30", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} + +// ToTaskPolicy converts configuration to a TaskPolicy protobuf message +func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: float64(c.FullnessRatio), + QuietForSeconds: int32(c.QuietForSeconds), + MinVolumeSizeMb: int32(c.MinSizeMB), + CollectionFilter: c.CollectionFilter, + }, + }, + } +} + +// FromTaskPolicy loads configuration from a TaskPolicy protobuf message +func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + + // Set general TaskPolicy fields + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping + + // Set erasure coding-specific fields from the task config + if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil { + c.FullnessRatio = float64(ecConfig.FullnessRatio) + c.QuietForSeconds = int(ecConfig.QuietForSeconds) + c.MinSizeMB = int(ecConfig.MinVolumeSizeMb) + c.CollectionFilter = ecConfig.CollectionFilter + } + + return nil +} + +// LoadConfigFromPersistence loads configuration from the persistence layer if available +func LoadConfigFromPersistence(configPersistence interface{}) *Config { + config := NewDefaultConfig() + + // Try to load from persistence if available + if persistence, ok := configPersistence.(interface { + LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) + }); ok { + if policy, err := persistence.LoadErasureCodingTaskPolicy(); err == nil && policy != nil { + if err := config.FromTaskPolicy(policy); err == nil { + glog.V(1).Infof("Loaded erasure coding configuration from persistence") + return config + } + } + } + + glog.V(1).Infof("Using default erasure coding configuration") + return config +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go new file mode 100644 index 000000000..1a2558396 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -0,0 +1,140 @@ +package erasure_coding + +import ( + "fmt" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection implements the detection logic for erasure coding tasks +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + ecConfig := config.(*Config) + var results []*types.TaskDetectionResult + now := time.Now() + quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second + minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum + + debugCount := 0 + skippedAlreadyEC := 0 + skippedTooSmall := 0 + skippedCollectionFilter := 0 + skippedQuietTime := 0 + skippedFullness := 0 + + for _, metric := range metrics { + // Skip if already EC volume + if metric.IsECVolume { + skippedAlreadyEC++ + continue + } + + // Check minimum size requirement + if metric.Size < minSizeBytes { + skippedTooSmall++ + continue + } + + // Check collection filter if specified + if ecConfig.CollectionFilter != "" { + // Parse comma-separated collections + allowedCollections := make(map[string]bool) + for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { + allowedCollections[strings.TrimSpace(collection)] = true + } + // Skip if volume's collection is not in the allowed list + if !allowedCollections[metric.Collection] { + skippedCollectionFilter++ + continue + } + } + + // Check quiet duration and fullness criteria + if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeErasureCoding, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: types.TaskPriorityLow, // EC is not urgent + Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)", + metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, + float64(metric.Size)/(1024*1024)), + ScheduleAt: now, + } + results = append(results, result) + } else { + // Count debug reasons + if debugCount < 5 { // Limit to avoid spam + if metric.Age < quietThreshold { + skippedQuietTime++ + } + if metric.FullnessRatio < ecConfig.FullnessRatio { + skippedFullness++ + } + } + debugCount++ + } + } + + // Log debug summary if no tasks were created + if len(results) == 0 && len(metrics) > 0 { + totalVolumes := len(metrics) + glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", + totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness) + + // Show details for first few volumes + for i, metric := range metrics { + if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes + continue + } + sizeMB := float64(metric.Size) / (1024 * 1024) + glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)", + metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute), + metric.FullnessRatio*100, ecConfig.FullnessRatio*100) + } + } + + return results, nil +} + +// Scheduling implements the scheduling logic for erasure coding tasks +func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + ecConfig := config.(*Config) + + // Check if we have available workers + if len(availableWorkers) == 0 { + return false + } + + // Count running EC tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeErasureCoding { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecConfig.MaxConcurrent { + return false + } + + // Check if any worker can handle EC tasks + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeErasureCoding { + return true + } + } + } + + return false +} diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go index 641dfc6b5..8dc7a1cd0 100644 --- a/weed/worker/tasks/erasure_coding/ec.go +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -1,79 +1,785 @@ package erasure_coding import ( + "context" "fmt" + "io" + "math" + "os" + "path/filepath" + "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) -// Task implements erasure coding operation to convert volumes to EC format +// Task implements comprehensive erasure coding with protobuf parameters type Task struct { - *tasks.BaseTask - server string - volumeID uint32 + *base.BaseTypedTask + + // Current task state + sourceServer string + volumeID uint32 + collection string + workDir string + masterClient string + grpcDialOpt grpc.DialOption + + // EC parameters from protobuf + destinations []*worker_pb.ECDestination // Disk-aware destinations + existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup + estimatedShardSize uint64 + dataShards int + parityShards int + cleanupSource bool + + // Progress tracking + currentStep string + stepProgress map[string]float64 } -// NewTask creates a new erasure coding task instance -func NewTask(server string, volumeID uint32) *Task { +// NewTask creates a new erasure coding task +func NewTask() types.TypedTaskInterface { task := &Task{ - BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), - server: server, - volumeID: volumeID, + BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding), + masterClient: "localhost:9333", // Default master client + workDir: "/tmp/seaweedfs_ec_work", // Default work directory + grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure + dataShards: erasure_coding.DataShardsCount, // Use package constant + parityShards: erasure_coding.ParityShardsCount, // Use package constant + stepProgress: make(map[string]float64), } return task } -// Execute executes the erasure coding task -func (t *Task) Execute(params types.TaskParams) error { - glog.Infof("Starting erasure coding task for volume %d on server %s", t.volumeID, t.server) +// ValidateTyped validates the typed parameters for EC task +func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error { + // Basic validation from base class + if err := t.BaseTypedTask.ValidateTyped(params); err != nil { + return err + } + + // Check that we have EC-specific parameters + ecParams := params.GetErasureCodingParams() + if ecParams == nil { + return fmt.Errorf("erasure_coding_params is required for EC task") + } + + // Require destinations + if len(ecParams.Destinations) == 0 { + return fmt.Errorf("destinations must be specified for EC task") + } + + // DataShards and ParityShards are constants from erasure_coding package + expectedDataShards := int32(erasure_coding.DataShardsCount) + expectedParityShards := int32(erasure_coding.ParityShardsCount) + + if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards { + return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards) + } + if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards { + return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards) + } + + // Validate destination count + destinationCount := len(ecParams.Destinations) + totalShards := expectedDataShards + expectedParityShards + if totalShards > int32(destinationCount) { + return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount) + } + + return nil +} + +// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters +func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration { + baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations + + ecParams := params.GetErasureCodingParams() + if ecParams != nil && ecParams.EstimatedShardSize > 0 { + // More accurate estimate based on shard size + // Account for copying, encoding, and distribution + gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024) + estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB + if estimatedTime > baseTime { + return estimatedTime + } + } + + return baseTime +} + +// ExecuteTyped implements the actual erasure coding workflow with typed parameters +func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error { + // Extract basic parameters + t.volumeID = params.VolumeId + t.sourceServer = params.Server + t.collection = params.Collection - // Simulate erasure coding operation with progress updates - steps := []struct { - name string - duration time.Duration - progress float64 - }{ - {"Analyzing volume", 2 * time.Second, 15}, - {"Creating EC shards", 5 * time.Second, 50}, - {"Verifying shards", 2 * time.Second, 75}, - {"Finalizing EC volume", 1 * time.Second, 100}, + // Extract EC-specific parameters + ecParams := params.GetErasureCodingParams() + if ecParams != nil { + t.destinations = ecParams.Destinations // Store disk-aware destinations + t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup + t.estimatedShardSize = ecParams.EstimatedShardSize + t.cleanupSource = ecParams.CleanupSource + + // DataShards and ParityShards are constants, don't override from parameters + // t.dataShards and t.parityShards are already set to constants in NewTask + + if ecParams.WorkingDir != "" { + t.workDir = ecParams.WorkingDir + } + if ecParams.MasterClient != "" { + t.masterClient = ecParams.MasterClient + } } - for _, step := range steps { - if t.IsCancelled() { - return fmt.Errorf("erasure coding task cancelled") + // Determine available destinations for logging + var availableDestinations []string + for _, dest := range t.destinations { + availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId)) + } + + glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)", + t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards) + + // Create unique working directory for this task + taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix())) + if err := os.MkdirAll(taskWorkDir, 0755); err != nil { + return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err) + } + glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir) + + // Ensure cleanup of working directory + defer func() { + if err := os.RemoveAll(taskWorkDir); err != nil { + glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err) + } else { + glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir) } + }() + + // Step 1: Collect volume locations from master + glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master") + t.SetProgress(5.0) + volumeId := needle.VolumeId(t.volumeID) + volumeLocations, err := t.collectVolumeLocations(volumeId) + if err != nil { + return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err) + } + glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations) - glog.V(1).Infof("Erasure coding task step: %s", step.name) - t.SetProgress(step.progress) + // Convert ServerAddress slice to string slice + var locationStrings []string + for _, addr := range volumeLocations { + locationStrings = append(locationStrings, string(addr)) + } - // Simulate work - time.Sleep(step.duration) + // Step 2: Check if volume has sufficient size for EC encoding + if !t.shouldPerformECEncoding(locationStrings) { + glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID) + t.SetProgress(100.0) + return nil } - glog.Infof("Erasure coding task completed for volume %d on server %s", t.volumeID, t.server) + // Step 2A: Cleanup existing EC shards if any + glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID) + t.SetProgress(10.0) + err = t.cleanupExistingEcShards() + if err != nil { + glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err) + // Don't fail the task - this is just cleanup + } + glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID) + + // Step 3: Mark volume readonly on all servers + glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID) + t.SetProgress(15.0) + err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings) + if err != nil { + return fmt.Errorf("failed to mark volume readonly: %v", err) + } + glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID) + + // Step 5: Copy volume files (.dat, .idx) to EC worker + glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer) + t.SetProgress(25.0) + localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) + if err != nil { + return fmt.Errorf("failed to copy volume files to EC worker: %v", err) + } + glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles) + + // Step 6: Generate EC shards locally on EC worker + glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker") + t.SetProgress(40.0) + localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir) + if err != nil { + return fmt.Errorf("failed to generate EC shards locally: %v", err) + } + glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles)) + + // Step 7: Distribute shards from EC worker to destination servers + glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers") + t.SetProgress(60.0) + err = t.distributeEcShardsFromWorker(localShardFiles) + if err != nil { + return fmt.Errorf("failed to distribute EC shards from worker: %v", err) + } + glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers") + + // Step 8: Mount EC shards on destination servers + glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers") + t.SetProgress(80.0) + err = t.mountEcShardsOnDestinations() + if err != nil { + return fmt.Errorf("failed to mount EC shards: %v", err) + } + glog.V(1).Infof("WORKFLOW: EC shards mounted successfully") + + // Step 9: Delete original volume from all locations + glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID) + t.SetProgress(90.0) + err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings) + if err != nil { + return fmt.Errorf("failed to delete original volume: %v", err) + } + glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID) + + t.SetProgress(100.0) + glog.Infof("EC task completed successfully for volume %d", t.volumeID) return nil } -// Validate validates the task parameters -func (t *Task) Validate(params types.TaskParams) error { - if params.VolumeID == 0 { - return fmt.Errorf("volume_id is required") +// collectVolumeLocations gets volume location from master (placeholder implementation) +func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) { + // For now, return a placeholder implementation + // Full implementation would call master to get volume locations + return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil +} + +// cleanupExistingEcShards deletes existing EC shards using planned locations +func (t *Task) cleanupExistingEcShards() error { + if len(t.existingShardLocations) == 0 { + glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID) + return nil } - if params.Server == "" { - return fmt.Errorf("server is required") + + glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations)) + + // Delete existing shards from each location using planned shard locations + for _, location := range t.existingShardLocations { + if len(location.ShardIds) == 0 { + continue + } + + glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: location.ShardIds, + }) + return deleteErr + }) + + if err != nil { + glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err) + // Continue with other servers - don't fail the entire cleanup + } else { + glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID) + } } + + glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID) return nil } -// EstimateTime estimates the time needed for the task -func (t *Task) EstimateTime(params types.TaskParams) time.Duration { - // Base time for erasure coding operation - baseTime := 30 * time.Second +// shouldPerformECEncoding checks if the volume meets criteria for EC encoding +func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool { + // For now, always proceed with EC encoding if volume exists + // This can be extended with volume size checks, etc. + return len(volumeLocations) > 0 +} - // Could adjust based on volume size or other factors - return baseTime +// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers +func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error { + glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations)) + + // Mark volume readonly on all replica servers + for _, location := range volumeLocations { + glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: uint32(volumeId), + }) + return markErr + }) + + if err != nil { + glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err) + return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err) + } + + glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location) + } + + glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations)) + return nil +} + +// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker +func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { + localFiles := make(map[string]string) + + // Copy .dat file + datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID))) + err := t.copyFileFromSource(".dat", datFile) + if err != nil { + return nil, fmt.Errorf("failed to copy .dat file: %v", err) + } + localFiles["dat"] = datFile + glog.V(1).Infof("Copied .dat file to: %s", datFile) + + // Copy .idx file + idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID))) + err = t.copyFileFromSource(".idx", idxFile) + if err != nil { + return nil, fmt.Errorf("failed to copy .idx file: %v", err) + } + localFiles["idx"] = idxFile + glog.V(1).Infof("Copied .idx file to: %s", idxFile) + + return localFiles, nil +} + +// copyFileFromSource copies a file from source server to local path using gRPC streaming +func (t *Task) copyFileFromSource(ext, localPath string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + Ext: ext, + StopOffset: uint64(math.MaxInt64), + }) + if err != nil { + return fmt.Errorf("failed to initiate file copy: %v", err) + } + + // Create local file + localFile, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create local file %s: %v", localPath, err) + } + defer localFile.Close() + + // Stream data and write to local file + totalBytes := int64(0) + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to receive file data: %v", err) + } + + if len(resp.FileContent) > 0 { + written, writeErr := localFile.Write(resp.FileContent) + if writeErr != nil { + return fmt.Errorf("failed to write to local file: %v", writeErr) + } + totalBytes += int64(written) + } + } + + glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath) + return nil + }) +} + +// generateEcShardsLocally generates EC shards from local volume files +func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) { + datFile := localFiles["dat"] + idxFile := localFiles["idx"] + + if datFile == "" || idxFile == "" { + return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile) + } + + // Get base name without extension for EC operations + baseName := strings.TrimSuffix(datFile, ".dat") + + shardFiles := make(map[string]string) + + glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile) + + // Generate EC shard files (.ec00 ~ .ec13) + if err := erasure_coding.WriteEcFiles(baseName); err != nil { + return nil, fmt.Errorf("failed to generate EC shard files: %v", err) + } + + // Generate .ecx file from .idx + if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil { + return nil, fmt.Errorf("failed to generate .ecx file: %v", err) + } + + // Collect generated shard file paths + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile := fmt.Sprintf("%s.ec%02d", baseName, i) + if _, err := os.Stat(shardFile); err == nil { + shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile + } + } + + // Add metadata files + ecxFile := idxFile + ".ecx" + if _, err := os.Stat(ecxFile); err == nil { + shardFiles["ecx"] = ecxFile + } + + // Generate .vif file (volume info) + vifFile := baseName + ".vif" + // Create basic volume info - in a real implementation, this would come from the original volume + volumeInfo := &volume_server_pb.VolumeInfo{ + Version: uint32(needle.GetCurrentVersion()), + } + if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil { + glog.Warningf("Failed to create .vif file: %v", err) + } else { + shardFiles["vif"] = vifFile + } + + glog.V(1).Infof("Generated %d EC files locally", len(shardFiles)) + return shardFiles, nil +} + +func (t *Task) copyEcShardsToDestinations() error { + if len(t.destinations) == 0 { + return fmt.Errorf("no destinations specified for EC shard distribution") + } + + destinations := t.destinations + + glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations)) + + // Prepare shard IDs (0-13 for EC shards) + var shardIds []uint32 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardIds = append(shardIds, uint32(i)) + } + + // Distribute shards across destinations + var wg sync.WaitGroup + errorChan := make(chan error, len(destinations)) + + // Track which disks have already received metadata files (server+disk) + metadataFilesCopied := make(map[string]bool) + var metadataMutex sync.Mutex + + // For each destination, copy a subset of shards + shardsPerDest := len(shardIds) / len(destinations) + remainder := len(shardIds) % len(destinations) + + shardOffset := 0 + for i, dest := range destinations { + wg.Add(1) + + shardsForThisDest := shardsPerDest + if i < remainder { + shardsForThisDest++ // Distribute remainder shards + } + + destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest] + shardOffset += shardsForThisDest + + go func(destination *worker_pb.ECDestination, targetShardIds []uint32) { + defer wg.Done() + + if t.IsCancelled() { + errorChan <- fmt.Errorf("task cancelled during shard copy") + return + } + + // Create disk-specific metadata key (server+disk) + diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId) + + glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)", + targetShardIds, t.sourceServer, destination.Node, destination.DiskId) + + // Check if this disk needs metadata files (only once per disk) + metadataMutex.Lock() + needsMetadataFiles := !metadataFilesCopied[diskKey] + if needsMetadataFiles { + metadataFilesCopied[diskKey] = true + } + metadataMutex.Unlock() + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(t.volumeID), + Collection: t.collection, + ShardIds: targetShardIds, + CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk + CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk + CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk + SourceDataNode: t.sourceServer, + DiskId: destination.DiskId, // Pass target disk ID + }) + return copyErr + }) + + if err != nil { + errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err) + return + } + + if needsMetadataFiles { + glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d", + targetShardIds, destination.Node, destination.DiskId) + } else { + glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)", + targetShardIds, destination.Node, destination.DiskId) + } + }(dest, destShardIds) + } + + wg.Wait() + close(errorChan) + + // Check for any copy errors + if err := <-errorChan; err != nil { + return err + } + + glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID) + return nil +} + +// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers +func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error { + if len(t.destinations) == 0 { + return fmt.Errorf("no destinations specified for EC shard distribution") + } + + destinations := t.destinations + + glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations)) + + // Prepare shard IDs (0-13 for EC shards) + var shardIds []uint32 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardIds = append(shardIds, uint32(i)) + } + + // Distribute shards across destinations + var wg sync.WaitGroup + errorChan := make(chan error, len(destinations)) + + // Track which disks have already received metadata files (server+disk) + metadataFilesCopied := make(map[string]bool) + var metadataMutex sync.Mutex + + // For each destination, send a subset of shards + shardsPerDest := len(shardIds) / len(destinations) + remainder := len(shardIds) % len(destinations) + + shardOffset := 0 + for i, dest := range destinations { + wg.Add(1) + + shardsForThisDest := shardsPerDest + if i < remainder { + shardsForThisDest++ // Distribute remainder shards + } + + destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest] + shardOffset += shardsForThisDest + + go func(destination *worker_pb.ECDestination, targetShardIds []uint32) { + defer wg.Done() + + if t.IsCancelled() { + errorChan <- fmt.Errorf("task cancelled during shard distribution") + return + } + + // Create disk-specific metadata key (server+disk) + diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId) + + glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)", + targetShardIds, destination.Node, destination.DiskId) + + // Check if this disk needs metadata files (only once per disk) + metadataMutex.Lock() + needsMetadataFiles := !metadataFilesCopied[diskKey] + if needsMetadataFiles { + metadataFilesCopied[diskKey] = true + } + metadataMutex.Unlock() + + // Send shard files to destination using HTTP upload (simplified for now) + err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles) + if err != nil { + errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err) + return + } + + if needsMetadataFiles { + glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d", + targetShardIds, destination.Node, destination.DiskId) + } else { + glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)", + targetShardIds, destination.Node, destination.DiskId) + } + }(dest, destShardIds) + } + + wg.Wait() + close(errorChan) + + // Check for any distribution errors + if err := <-errorChan; err != nil { + return err + } + + glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID) + return nil +} + +// sendShardsToDestination sends specific shard files from worker to a destination server (simplified) +func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error { + // For now, use a simplified approach - just upload the files + // In a full implementation, this would use proper file upload mechanisms + glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId) + + // TODO: Implement actual file upload to volume server + // This is a placeholder - actual implementation would: + // 1. Open each shard file locally + // 2. Upload via HTTP POST or gRPC stream to destination volume server + // 3. Volume server would save to the specified disk_id + + return nil +} + +// mountEcShardsOnDestinations mounts EC shards on all destination servers +func (t *Task) mountEcShardsOnDestinations() error { + if len(t.destinations) == 0 { + return fmt.Errorf("no destinations specified for mounting EC shards") + } + + destinations := t.destinations + + glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations)) + + // Prepare all shard IDs (0-13) + var allShardIds []uint32 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + allShardIds = append(allShardIds, uint32(i)) + } + + var wg sync.WaitGroup + errorChan := make(chan error, len(destinations)) + + // Mount shards on each destination server + for _, dest := range destinations { + wg.Add(1) + + go func(destination *worker_pb.ECDestination) { + defer wg.Done() + + if t.IsCancelled() { + errorChan <- fmt.Errorf("task cancelled during shard mounting") + return + } + + glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(t.volumeID), + Collection: t.collection, + ShardIds: allShardIds, // Mount all available shards on each server + }) + return mountErr + }) + + if err != nil { + // It's normal for some servers to not have all shards, so log as warning rather than error + glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err) + } else { + glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId) + } + }(dest) + } + + wg.Wait() + close(errorChan) + + // Check for any critical mounting errors + select { + case err := <-errorChan: + if err != nil { + glog.Warningf("Some shard mounting issues occurred: %v", err) + } + default: + // No errors + } + + glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID) + return nil +} + +// deleteVolumeFromAllLocations deletes the original volume from all replica servers +func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error { + glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations)) + + for _, location := range volumeLocations { + glog.V(1).Infof("Deleting volume %d from %s", volumeId, location) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + VolumeId: uint32(volumeId), + OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards + }) + return deleteErr + }) + + if err != nil { + glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err) + return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err) + } + + glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location) + } + + glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations)) + return nil +} + +// Register the task in the global registry +func init() { + types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask) + glog.V(1).Infof("Registered EC task") } diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go deleted file mode 100644 index 0f8b5e376..000000000 --- a/weed/worker/tasks/erasure_coding/ec_detector.go +++ /dev/null @@ -1,139 +0,0 @@ -package erasure_coding - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// EcDetector implements erasure coding task detection -type EcDetector struct { - enabled bool - volumeAgeHours int - fullnessRatio float64 - scanInterval time.Duration -} - -// Compile-time interface assertions -var ( - _ types.TaskDetector = (*EcDetector)(nil) -) - -// NewEcDetector creates a new erasure coding detector -func NewEcDetector() *EcDetector { - return &EcDetector{ - enabled: false, // Conservative default - volumeAgeHours: 24 * 7, // 1 week - fullnessRatio: 0.9, // 90% full - scanInterval: 2 * time.Hour, - } -} - -// GetTaskType returns the task type -func (d *EcDetector) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// ScanForTasks scans for volumes that should be converted to erasure coding -func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { - if !d.enabled { - return nil, nil - } - - var results []*types.TaskDetectionResult - now := time.Now() - ageThreshold := time.Duration(d.volumeAgeHours) * time.Hour - - for _, metric := range volumeMetrics { - // Skip if already EC volume - if metric.IsECVolume { - continue - } - - // Check age and fullness criteria - if metric.Age >= ageThreshold && metric.FullnessRatio >= d.fullnessRatio { - // Check if volume is read-only (safe for EC conversion) - if !metric.IsReadOnly { - continue - } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeErasureCoding, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: types.TaskPriorityLow, // EC is not urgent - Reason: "Volume is old and full enough for EC conversion", - Parameters: map[string]interface{}{ - "age_hours": int(metric.Age.Hours()), - "fullness_ratio": metric.FullnessRatio, - }, - ScheduleAt: now, - } - results = append(results, result) - } - } - - glog.V(2).Infof("EC detector found %d tasks to schedule", len(results)) - return results, nil -} - -// ScanInterval returns how often this task type should be scanned -func (d *EcDetector) ScanInterval() time.Duration { - return d.scanInterval -} - -// IsEnabled returns whether this task type is enabled -func (d *EcDetector) IsEnabled() bool { - return d.enabled -} - -// Configuration setters - -func (d *EcDetector) SetEnabled(enabled bool) { - d.enabled = enabled -} - -func (d *EcDetector) SetVolumeAgeHours(hours int) { - d.volumeAgeHours = hours -} - -func (d *EcDetector) SetFullnessRatio(ratio float64) { - d.fullnessRatio = ratio -} - -func (d *EcDetector) SetScanInterval(interval time.Duration) { - d.scanInterval = interval -} - -// GetVolumeAgeHours returns the current volume age threshold in hours -func (d *EcDetector) GetVolumeAgeHours() int { - return d.volumeAgeHours -} - -// GetFullnessRatio returns the current fullness ratio threshold -func (d *EcDetector) GetFullnessRatio() float64 { - return d.fullnessRatio -} - -// GetScanInterval returns the scan interval -func (d *EcDetector) GetScanInterval() time.Duration { - return d.scanInterval -} - -// ConfigureFromPolicy configures the detector based on the maintenance policy -func (d *EcDetector) ConfigureFromPolicy(policy interface{}) { - // Type assert to the maintenance policy type we expect - if maintenancePolicy, ok := policy.(interface { - GetECEnabled() bool - GetECVolumeAgeHours() int - GetECFullnessRatio() float64 - }); ok { - d.SetEnabled(maintenancePolicy.GetECEnabled()) - d.SetVolumeAgeHours(maintenancePolicy.GetECVolumeAgeHours()) - d.SetFullnessRatio(maintenancePolicy.GetECFullnessRatio()) - } else { - glog.V(1).Infof("Could not configure EC detector from policy: unsupported policy type") - } -} diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go index 6c4b5bf7f..62cfe6b56 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/ec_register.go @@ -2,80 +2,71 @@ package erasure_coding import ( "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Factory creates erasure coding task instances -type Factory struct { - *tasks.BaseTaskFactory -} +// Global variable to hold the task definition for configuration updates +var globalTaskDef *base.TaskDefinition -// NewFactory creates a new erasure coding task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeErasureCoding, - []string{"erasure_coding", "storage", "durability"}, - "Convert volumes to erasure coded format for improved durability", - ), - } -} - -// Create creates a new erasure coding task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID) - task.SetEstimatedDuration(task.EstimateTime(params)) +// Auto-register this task when the package is imported +func init() { + RegisterErasureCodingTask() - return task, nil + // Register config updater + tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence) } -// Shared detector and scheduler instances -var ( - sharedDetector *EcDetector - sharedScheduler *Scheduler -) +// RegisterErasureCodingTask registers the erasure coding task with the new architecture +func RegisterErasureCodingTask() { + // Create configuration instance + config := NewDefaultConfig() -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*EcDetector, *Scheduler) { - if sharedDetector == nil { - sharedDetector = NewEcDetector() - } - if sharedScheduler == nil { - sharedScheduler = NewScheduler() + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeErasureCoding, + Name: "erasure_coding", + DisplayName: "Erasure Coding", + Description: "Applies erasure coding to volumes for data protection", + Icon: "fas fa-shield-alt text-success", + Capabilities: []string{"erasure_coding", "data_protection"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: nil, // Uses typed task system - see init() in ec.go + DetectionFunc: Detection, + ScanInterval: 1 * time.Hour, + SchedulingFunc: Scheduling, + MaxConcurrent: 1, + RepeatInterval: 24 * time.Hour, } - return sharedDetector, sharedScheduler -} -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*EcDetector, *Scheduler) { - return getSharedInstances() + // Store task definition globally for configuration updates + globalTaskDef = taskDef + + // Register everything with a single function call! + base.RegisterTask(taskDef) } -// Auto-register this task when the package is imported -func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeErasureCoding, factory) +// UpdateConfigFromPersistence updates the erasure coding configuration from persistence +func UpdateConfigFromPersistence(configPersistence interface{}) error { + if globalTaskDef == nil { + return fmt.Errorf("erasure coding task not registered") + } - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() + // Load configuration from persistence + newConfig := LoadConfigFromPersistence(configPersistence) + if newConfig == nil { + return fmt.Errorf("failed to load configuration from persistence") + } - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) + // Update the task definition's config + globalTaskDef.Config = newConfig - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + glog.V(1).Infof("Updated erasure coding task configuration from persistence") + return nil } diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go deleted file mode 100644 index b2366bb06..000000000 --- a/weed/worker/tasks/erasure_coding/ec_scheduler.go +++ /dev/null @@ -1,114 +0,0 @@ -package erasure_coding - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Scheduler implements erasure coding task scheduling -type Scheduler struct { - maxConcurrent int - enabled bool -} - -// NewScheduler creates a new erasure coding scheduler -func NewScheduler() *Scheduler { - return &Scheduler{ - maxConcurrent: 1, // Conservative default - enabled: false, // Conservative default - } -} - -// GetTaskType returns the task type -func (s *Scheduler) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// CanScheduleNow determines if an erasure coding task can be scheduled now -func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { - if !s.enabled { - return false - } - - // Check if we have available workers - if len(availableWorkers) == 0 { - return false - } - - // Count running EC tasks - runningCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { - runningCount++ - } - } - - // Check concurrency limit - if runningCount >= s.maxConcurrent { - glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent) - return false - } - - // Check if any worker can handle EC tasks - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { - glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID) - return true - } - } - } - - return false -} - -// GetMaxConcurrent returns the maximum number of concurrent tasks -func (s *Scheduler) GetMaxConcurrent() int { - return s.maxConcurrent -} - -// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks -func (s *Scheduler) GetDefaultRepeatInterval() time.Duration { - return 24 * time.Hour // Don't repeat EC for 24 hours -} - -// GetPriority returns the priority for this task -func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority { - return types.TaskPriorityLow // EC is not urgent -} - -// WasTaskRecentlyCompleted checks if a similar task was recently completed -func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool { - // Don't repeat EC for 24 hours - interval := 24 * time.Hour - cutoff := now.Add(-interval) - - for _, completedTask := range completedTasks { - if completedTask.Type == types.TaskTypeErasureCoding && - completedTask.VolumeID == task.VolumeID && - completedTask.Server == task.Server && - completedTask.Status == types.TaskStatusCompleted && - completedTask.CompletedAt != nil && - completedTask.CompletedAt.After(cutoff) { - return true - } - } - return false -} - -// IsEnabled returns whether this task type is enabled -func (s *Scheduler) IsEnabled() bool { - return s.enabled -} - -// Configuration setters - -func (s *Scheduler) SetEnabled(enabled bool) { - s.enabled = enabled -} - -func (s *Scheduler) SetMaxConcurrent(max int) { - s.maxConcurrent = max -} diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go deleted file mode 100644 index e17cba89a..000000000 --- a/weed/worker/tasks/erasure_coding/ui.go +++ /dev/null @@ -1,309 +0,0 @@ -package erasure_coding - -import ( - "fmt" - "html/template" - "strconv" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// UIProvider provides the UI for erasure coding task configuration -type UIProvider struct { - detector *EcDetector - scheduler *Scheduler -} - -// NewUIProvider creates a new erasure coding UI provider -func NewUIProvider(detector *EcDetector, scheduler *Scheduler) *UIProvider { - return &UIProvider{ - detector: detector, - scheduler: scheduler, - } -} - -// GetTaskType returns the task type -func (ui *UIProvider) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// GetDisplayName returns the human-readable name -func (ui *UIProvider) GetDisplayName() string { - return "Erasure Coding" -} - -// GetDescription returns a description of what this task does -func (ui *UIProvider) GetDescription() string { - return "Converts volumes to erasure coded format for improved data durability and fault tolerance" -} - -// GetIcon returns the icon CSS class for this task type -func (ui *UIProvider) GetIcon() string { - return "fas fa-shield-alt text-info" -} - -// ErasureCodingConfig represents the erasure coding configuration -type ErasureCodingConfig struct { - Enabled bool `json:"enabled"` - VolumeAgeHoursSeconds int `json:"volume_age_hours_seconds"` - FullnessRatio float64 `json:"fullness_ratio"` - ScanIntervalSeconds int `json:"scan_interval_seconds"` - MaxConcurrent int `json:"max_concurrent"` - ShardCount int `json:"shard_count"` - ParityCount int `json:"parity_count"` - CollectionFilter string `json:"collection_filter"` -} - -// Helper functions for duration conversion -func secondsToDuration(seconds int) time.Duration { - return time.Duration(seconds) * time.Second -} - -func durationToSeconds(d time.Duration) int { - return int(d.Seconds()) -} - -// formatDurationForUser formats seconds as a user-friendly duration string -func formatDurationForUser(seconds int) string { - d := secondsToDuration(seconds) - if d < time.Minute { - return fmt.Sprintf("%ds", seconds) - } - if d < time.Hour { - return fmt.Sprintf("%.0fm", d.Minutes()) - } - if d < 24*time.Hour { - return fmt.Sprintf("%.1fh", d.Hours()) - } - return fmt.Sprintf("%.1fd", d.Hours()/24) -} - -// RenderConfigForm renders the configuration form HTML -func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) { - config := ui.getCurrentECConfig() - - // Build form using the FormBuilder helper - form := types.NewFormBuilder() - - // Detection Settings - form.AddCheckboxField( - "enabled", - "Enable Erasure Coding Tasks", - "Whether erasure coding tasks should be automatically created", - config.Enabled, - ) - - form.AddNumberField( - "volume_age_hours_seconds", - "Volume Age Threshold", - "Only apply erasure coding to volumes older than this duration", - float64(config.VolumeAgeHoursSeconds), - true, - ) - - form.AddNumberField( - "scan_interval_seconds", - "Scan Interval", - "How often to scan for volumes needing erasure coding", - float64(config.ScanIntervalSeconds), - true, - ) - - // Scheduling Settings - form.AddNumberField( - "max_concurrent", - "Max Concurrent Tasks", - "Maximum number of erasure coding tasks that can run simultaneously", - float64(config.MaxConcurrent), - true, - ) - - // Erasure Coding Parameters - form.AddNumberField( - "shard_count", - "Data Shards", - "Number of data shards for erasure coding (recommended: 10)", - float64(config.ShardCount), - true, - ) - - form.AddNumberField( - "parity_count", - "Parity Shards", - "Number of parity shards for erasure coding (recommended: 4)", - float64(config.ParityCount), - true, - ) - - // Generate organized form sections using Bootstrap components - html := ` -<div class="row"> - <div class="col-12"> - <div class="card mb-4"> - <div class="card-header"> - <h5 class="mb-0"> - <i class="fas fa-shield-alt me-2"></i> - Erasure Coding Configuration - </h5> - </div> - <div class="card-body"> -` + string(form.Build()) + ` - </div> - </div> - </div> -</div> - -<div class="row"> - <div class="col-12"> - <div class="card mb-3"> - <div class="card-header"> - <h5 class="mb-0"> - <i class="fas fa-info-circle me-2"></i> - Performance Impact - </h5> - </div> - <div class="card-body"> - <div class="alert alert-info" role="alert"> - <h6 class="alert-heading">Important Notes:</h6> - <p class="mb-2"><strong>Performance:</strong> Erasure coding is CPU and I/O intensive. Consider running during off-peak hours.</p> - <p class="mb-0"><strong>Durability:</strong> With ` + fmt.Sprintf("%d+%d", config.ShardCount, config.ParityCount) + ` configuration, can tolerate up to ` + fmt.Sprintf("%d", config.ParityCount) + ` shard failures.</p> - </div> - </div> - </div> - </div> -</div>` - - return template.HTML(html), nil -} - -// ParseConfigForm parses form data into configuration -func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) { - config := &ErasureCodingConfig{} - - // Parse enabled - config.Enabled = len(formData["enabled"]) > 0 - - // Parse volume age hours - if values, ok := formData["volume_age_hours_seconds"]; ok && len(values) > 0 { - hours, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid volume age hours: %w", err) - } - config.VolumeAgeHoursSeconds = hours - } - - // Parse scan interval - if values, ok := formData["scan_interval_seconds"]; ok && len(values) > 0 { - interval, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid scan interval: %w", err) - } - config.ScanIntervalSeconds = interval - } - - // Parse max concurrent - if values, ok := formData["max_concurrent"]; ok && len(values) > 0 { - maxConcurrent, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid max concurrent: %w", err) - } - if maxConcurrent < 1 { - return nil, fmt.Errorf("max concurrent must be at least 1") - } - config.MaxConcurrent = maxConcurrent - } - - // Parse shard count - if values, ok := formData["shard_count"]; ok && len(values) > 0 { - shardCount, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid shard count: %w", err) - } - if shardCount < 1 { - return nil, fmt.Errorf("shard count must be at least 1") - } - config.ShardCount = shardCount - } - - // Parse parity count - if values, ok := formData["parity_count"]; ok && len(values) > 0 { - parityCount, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid parity count: %w", err) - } - if parityCount < 1 { - return nil, fmt.Errorf("parity count must be at least 1") - } - config.ParityCount = parityCount - } - - return config, nil -} - -// GetCurrentConfig returns the current configuration -func (ui *UIProvider) GetCurrentConfig() interface{} { - return ui.getCurrentECConfig() -} - -// ApplyConfig applies the new configuration -func (ui *UIProvider) ApplyConfig(config interface{}) error { - ecConfig, ok := config.(ErasureCodingConfig) - if !ok { - return fmt.Errorf("invalid config type, expected ErasureCodingConfig") - } - - // Apply to detector - if ui.detector != nil { - ui.detector.SetEnabled(ecConfig.Enabled) - ui.detector.SetVolumeAgeHours(ecConfig.VolumeAgeHoursSeconds) - ui.detector.SetScanInterval(secondsToDuration(ecConfig.ScanIntervalSeconds)) - } - - // Apply to scheduler - if ui.scheduler != nil { - ui.scheduler.SetEnabled(ecConfig.Enabled) - ui.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent) - } - - glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, age_threshold=%v, max_concurrent=%d, shards=%d+%d", - ecConfig.Enabled, ecConfig.VolumeAgeHoursSeconds, ecConfig.MaxConcurrent, ecConfig.ShardCount, ecConfig.ParityCount) - - return nil -} - -// getCurrentECConfig gets the current configuration from detector and scheduler -func (ui *UIProvider) getCurrentECConfig() ErasureCodingConfig { - config := ErasureCodingConfig{ - // Default values (fallback if detectors/schedulers are nil) - Enabled: true, - VolumeAgeHoursSeconds: 24 * 3600, // 24 hours in seconds - ScanIntervalSeconds: 2 * 3600, // 2 hours in seconds - MaxConcurrent: 1, - ShardCount: 10, - ParityCount: 4, - } - - // Get current values from detector - if ui.detector != nil { - config.Enabled = ui.detector.IsEnabled() - config.VolumeAgeHoursSeconds = ui.detector.GetVolumeAgeHours() - config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval()) - } - - // Get current values from scheduler - if ui.scheduler != nil { - config.MaxConcurrent = ui.scheduler.GetMaxConcurrent() - } - - return config -} - -// RegisterUI registers the erasure coding UI provider with the UI registry -func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) { - uiProvider := NewUIProvider(detector, scheduler) - uiRegistry.RegisterUI(uiProvider) - - glog.V(1).Infof("✅ Registered erasure coding task UI provider") -} |
