aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/erasure_coding')
-rw-r--r--weed/worker/tasks/erasure_coding/config.go207
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go140
-rw-r--r--weed/worker/tasks/erasure_coding/ec.go792
-rw-r--r--weed/worker/tasks/erasure_coding/ec_detector.go139
-rw-r--r--weed/worker/tasks/erasure_coding/ec_register.go109
-rw-r--r--weed/worker/tasks/erasure_coding/ec_scheduler.go114
-rw-r--r--weed/worker/tasks/erasure_coding/ui.go309
7 files changed, 1146 insertions, 664 deletions
diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go
new file mode 100644
index 000000000..1f70fb8db
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/config.go
@@ -0,0 +1,207 @@
+package erasure_coding
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with erasure coding specific settings
+type Config struct {
+ base.BaseConfig
+ QuietForSeconds int `json:"quiet_for_seconds"`
+ FullnessRatio float64 `json:"fullness_ratio"`
+ CollectionFilter string `json:"collection_filter"`
+ MinSizeMB int `json:"min_size_mb"`
+}
+
+// NewDefaultConfig creates a new default erasure coding configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 60 * 60, // 1 hour
+ MaxConcurrent: 1,
+ },
+ QuietForSeconds: 300, // 5 minutes
+ FullnessRatio: 0.8, // 80%
+ CollectionFilter: "",
+ MinSizeMB: 30, // 30MB (more reasonable than 100MB)
+ }
+}
+
+// GetConfigSpec returns the configuration schema for erasure coding tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Erasure Coding Tasks",
+ Description: "Whether erasure coding tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing erasure coding",
+ HelpText: "The system will check for volumes that need erasure coding at this interval",
+ Placeholder: "1",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 5,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of erasure coding tasks that can run simultaneously",
+ HelpText: "Limits the number of erasure coding operations running at the same time",
+ Placeholder: "1 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "quiet_for_seconds",
+ JSONName: "quiet_for_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 300,
+ MinValue: 60,
+ MaxValue: 3600,
+ Required: true,
+ DisplayName: "Quiet Period",
+ Description: "Minimum time volume must be quiet before erasure coding",
+ HelpText: "Volume must not be modified for this duration before erasure coding",
+ Placeholder: "5",
+ Unit: config.UnitMinutes,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "fullness_ratio",
+ JSONName: "fullness_ratio",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.8,
+ MinValue: 0.1,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Fullness Ratio",
+ Description: "Minimum fullness ratio to trigger erasure coding",
+ HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
+ Placeholder: "0.80 (80%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "collection_filter",
+ JSONName: "collection_filter",
+ Type: config.FieldTypeString,
+ DefaultValue: "",
+ Required: false,
+ DisplayName: "Collection Filter",
+ Description: "Only process volumes from specific collections",
+ HelpText: "Leave empty to process all collections, or specify collection name",
+ Placeholder: "my_collection",
+ InputType: "text",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_size_mb",
+ JSONName: "min_size_mb",
+ Type: config.FieldTypeInt,
+ DefaultValue: 30,
+ MinValue: 1,
+ MaxValue: 1000,
+ Required: true,
+ DisplayName: "Minimum Size (MB)",
+ Description: "Minimum volume size to consider for erasure coding",
+ HelpText: "Only volumes larger than this size will be considered for erasure coding",
+ Placeholder: "30",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
+func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: float64(c.FullnessRatio),
+ QuietForSeconds: int32(c.QuietForSeconds),
+ MinVolumeSizeMb: int32(c.MinSizeMB),
+ CollectionFilter: c.CollectionFilter,
+ },
+ },
+ }
+}
+
+// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
+func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+
+ // Set general TaskPolicy fields
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
+
+ // Set erasure coding-specific fields from the task config
+ if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
+ c.FullnessRatio = float64(ecConfig.FullnessRatio)
+ c.QuietForSeconds = int(ecConfig.QuietForSeconds)
+ c.MinSizeMB = int(ecConfig.MinVolumeSizeMb)
+ c.CollectionFilter = ecConfig.CollectionFilter
+ }
+
+ return nil
+}
+
+// LoadConfigFromPersistence loads configuration from the persistence layer if available
+func LoadConfigFromPersistence(configPersistence interface{}) *Config {
+ config := NewDefaultConfig()
+
+ // Try to load from persistence if available
+ if persistence, ok := configPersistence.(interface {
+ LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error)
+ }); ok {
+ if policy, err := persistence.LoadErasureCodingTaskPolicy(); err == nil && policy != nil {
+ if err := config.FromTaskPolicy(policy); err == nil {
+ glog.V(1).Infof("Loaded erasure coding configuration from persistence")
+ return config
+ }
+ }
+ }
+
+ glog.V(1).Infof("Using default erasure coding configuration")
+ return config
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
new file mode 100644
index 000000000..1a2558396
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -0,0 +1,140 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for erasure coding tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ ecConfig := config.(*Config)
+ var results []*types.TaskDetectionResult
+ now := time.Now()
+ quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
+ minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum
+
+ debugCount := 0
+ skippedAlreadyEC := 0
+ skippedTooSmall := 0
+ skippedCollectionFilter := 0
+ skippedQuietTime := 0
+ skippedFullness := 0
+
+ for _, metric := range metrics {
+ // Skip if already EC volume
+ if metric.IsECVolume {
+ skippedAlreadyEC++
+ continue
+ }
+
+ // Check minimum size requirement
+ if metric.Size < minSizeBytes {
+ skippedTooSmall++
+ continue
+ }
+
+ // Check collection filter if specified
+ if ecConfig.CollectionFilter != "" {
+ // Parse comma-separated collections
+ allowedCollections := make(map[string]bool)
+ for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
+ allowedCollections[strings.TrimSpace(collection)] = true
+ }
+ // Skip if volume's collection is not in the allowed list
+ if !allowedCollections[metric.Collection] {
+ skippedCollectionFilter++
+ continue
+ }
+ }
+
+ // Check quiet duration and fullness criteria
+ if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: types.TaskPriorityLow, // EC is not urgent
+ Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
+ metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
+ float64(metric.Size)/(1024*1024)),
+ ScheduleAt: now,
+ }
+ results = append(results, result)
+ } else {
+ // Count debug reasons
+ if debugCount < 5 { // Limit to avoid spam
+ if metric.Age < quietThreshold {
+ skippedQuietTime++
+ }
+ if metric.FullnessRatio < ecConfig.FullnessRatio {
+ skippedFullness++
+ }
+ }
+ debugCount++
+ }
+ }
+
+ // Log debug summary if no tasks were created
+ if len(results) == 0 && len(metrics) > 0 {
+ totalVolumes := len(metrics)
+ glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
+ totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
+
+ // Show details for first few volumes
+ for i, metric := range metrics {
+ if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes
+ continue
+ }
+ sizeMB := float64(metric.Size) / (1024 * 1024)
+ glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
+ metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute),
+ metric.FullnessRatio*100, ecConfig.FullnessRatio*100)
+ }
+ }
+
+ return results, nil
+}
+
+// Scheduling implements the scheduling logic for erasure coding tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ ecConfig := config.(*Config)
+
+ // Check if we have available workers
+ if len(availableWorkers) == 0 {
+ return false
+ }
+
+ // Count running EC tasks
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeErasureCoding {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningCount >= ecConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if any worker can handle EC tasks
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeErasureCoding {
+ return true
+ }
+ }
+ }
+
+ return false
+}
diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go
index 641dfc6b5..8dc7a1cd0 100644
--- a/weed/worker/tasks/erasure_coding/ec.go
+++ b/weed/worker/tasks/erasure_coding/ec.go
@@ -1,79 +1,785 @@
package erasure_coding
import (
+ "context"
"fmt"
+ "io"
+ "math"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
)
-// Task implements erasure coding operation to convert volumes to EC format
+// Task implements comprehensive erasure coding with protobuf parameters
type Task struct {
- *tasks.BaseTask
- server string
- volumeID uint32
+ *base.BaseTypedTask
+
+ // Current task state
+ sourceServer string
+ volumeID uint32
+ collection string
+ workDir string
+ masterClient string
+ grpcDialOpt grpc.DialOption
+
+ // EC parameters from protobuf
+ destinations []*worker_pb.ECDestination // Disk-aware destinations
+ existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup
+ estimatedShardSize uint64
+ dataShards int
+ parityShards int
+ cleanupSource bool
+
+ // Progress tracking
+ currentStep string
+ stepProgress map[string]float64
}
-// NewTask creates a new erasure coding task instance
-func NewTask(server string, volumeID uint32) *Task {
+// NewTask creates a new erasure coding task
+func NewTask() types.TypedTaskInterface {
task := &Task{
- BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
- server: server,
- volumeID: volumeID,
+ BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding),
+ masterClient: "localhost:9333", // Default master client
+ workDir: "/tmp/seaweedfs_ec_work", // Default work directory
+ grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure
+ dataShards: erasure_coding.DataShardsCount, // Use package constant
+ parityShards: erasure_coding.ParityShardsCount, // Use package constant
+ stepProgress: make(map[string]float64),
}
return task
}
-// Execute executes the erasure coding task
-func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting erasure coding task for volume %d on server %s", t.volumeID, t.server)
+// ValidateTyped validates the typed parameters for EC task
+func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error {
+ // Basic validation from base class
+ if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
+ return err
+ }
+
+ // Check that we have EC-specific parameters
+ ecParams := params.GetErasureCodingParams()
+ if ecParams == nil {
+ return fmt.Errorf("erasure_coding_params is required for EC task")
+ }
+
+ // Require destinations
+ if len(ecParams.Destinations) == 0 {
+ return fmt.Errorf("destinations must be specified for EC task")
+ }
+
+ // DataShards and ParityShards are constants from erasure_coding package
+ expectedDataShards := int32(erasure_coding.DataShardsCount)
+ expectedParityShards := int32(erasure_coding.ParityShardsCount)
+
+ if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards {
+ return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards)
+ }
+ if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards {
+ return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards)
+ }
+
+ // Validate destination count
+ destinationCount := len(ecParams.Destinations)
+ totalShards := expectedDataShards + expectedParityShards
+ if totalShards > int32(destinationCount) {
+ return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount)
+ }
+
+ return nil
+}
+
+// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters
+func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations
+
+ ecParams := params.GetErasureCodingParams()
+ if ecParams != nil && ecParams.EstimatedShardSize > 0 {
+ // More accurate estimate based on shard size
+ // Account for copying, encoding, and distribution
+ gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024)
+ estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB
+ if estimatedTime > baseTime {
+ return estimatedTime
+ }
+ }
+
+ return baseTime
+}
+
+// ExecuteTyped implements the actual erasure coding workflow with typed parameters
+func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error {
+ // Extract basic parameters
+ t.volumeID = params.VolumeId
+ t.sourceServer = params.Server
+ t.collection = params.Collection
- // Simulate erasure coding operation with progress updates
- steps := []struct {
- name string
- duration time.Duration
- progress float64
- }{
- {"Analyzing volume", 2 * time.Second, 15},
- {"Creating EC shards", 5 * time.Second, 50},
- {"Verifying shards", 2 * time.Second, 75},
- {"Finalizing EC volume", 1 * time.Second, 100},
+ // Extract EC-specific parameters
+ ecParams := params.GetErasureCodingParams()
+ if ecParams != nil {
+ t.destinations = ecParams.Destinations // Store disk-aware destinations
+ t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup
+ t.estimatedShardSize = ecParams.EstimatedShardSize
+ t.cleanupSource = ecParams.CleanupSource
+
+ // DataShards and ParityShards are constants, don't override from parameters
+ // t.dataShards and t.parityShards are already set to constants in NewTask
+
+ if ecParams.WorkingDir != "" {
+ t.workDir = ecParams.WorkingDir
+ }
+ if ecParams.MasterClient != "" {
+ t.masterClient = ecParams.MasterClient
+ }
}
- for _, step := range steps {
- if t.IsCancelled() {
- return fmt.Errorf("erasure coding task cancelled")
+ // Determine available destinations for logging
+ var availableDestinations []string
+ for _, dest := range t.destinations {
+ availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId))
+ }
+
+ glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)",
+ t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards)
+
+ // Create unique working directory for this task
+ taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
+ if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
+ return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
+ }
+ glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir)
+
+ // Ensure cleanup of working directory
+ defer func() {
+ if err := os.RemoveAll(taskWorkDir); err != nil {
+ glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err)
+ } else {
+ glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir)
}
+ }()
+
+ // Step 1: Collect volume locations from master
+ glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master")
+ t.SetProgress(5.0)
+ volumeId := needle.VolumeId(t.volumeID)
+ volumeLocations, err := t.collectVolumeLocations(volumeId)
+ if err != nil {
+ return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations)
- glog.V(1).Infof("Erasure coding task step: %s", step.name)
- t.SetProgress(step.progress)
+ // Convert ServerAddress slice to string slice
+ var locationStrings []string
+ for _, addr := range volumeLocations {
+ locationStrings = append(locationStrings, string(addr))
+ }
- // Simulate work
- time.Sleep(step.duration)
+ // Step 2: Check if volume has sufficient size for EC encoding
+ if !t.shouldPerformECEncoding(locationStrings) {
+ glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID)
+ t.SetProgress(100.0)
+ return nil
}
- glog.Infof("Erasure coding task completed for volume %d on server %s", t.volumeID, t.server)
+ // Step 2A: Cleanup existing EC shards if any
+ glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID)
+ t.SetProgress(10.0)
+ err = t.cleanupExistingEcShards()
+ if err != nil {
+ glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err)
+ // Don't fail the task - this is just cleanup
+ }
+ glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID)
+
+ // Step 3: Mark volume readonly on all servers
+ glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID)
+ t.SetProgress(15.0)
+ err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings)
+ if err != nil {
+ return fmt.Errorf("failed to mark volume readonly: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID)
+
+ // Step 5: Copy volume files (.dat, .idx) to EC worker
+ glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer)
+ t.SetProgress(25.0)
+ localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to copy volume files to EC worker: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles)
+
+ // Step 6: Generate EC shards locally on EC worker
+ glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker")
+ t.SetProgress(40.0)
+ localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to generate EC shards locally: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles))
+
+ // Step 7: Distribute shards from EC worker to destination servers
+ glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers")
+ t.SetProgress(60.0)
+ err = t.distributeEcShardsFromWorker(localShardFiles)
+ if err != nil {
+ return fmt.Errorf("failed to distribute EC shards from worker: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers")
+
+ // Step 8: Mount EC shards on destination servers
+ glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers")
+ t.SetProgress(80.0)
+ err = t.mountEcShardsOnDestinations()
+ if err != nil {
+ return fmt.Errorf("failed to mount EC shards: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards mounted successfully")
+
+ // Step 9: Delete original volume from all locations
+ glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID)
+ t.SetProgress(90.0)
+ err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings)
+ if err != nil {
+ return fmt.Errorf("failed to delete original volume: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID)
+
+ t.SetProgress(100.0)
+ glog.Infof("EC task completed successfully for volume %d", t.volumeID)
return nil
}
-// Validate validates the task parameters
-func (t *Task) Validate(params types.TaskParams) error {
- if params.VolumeID == 0 {
- return fmt.Errorf("volume_id is required")
+// collectVolumeLocations gets volume location from master (placeholder implementation)
+func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) {
+ // For now, return a placeholder implementation
+ // Full implementation would call master to get volume locations
+ return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil
+}
+
+// cleanupExistingEcShards deletes existing EC shards using planned locations
+func (t *Task) cleanupExistingEcShards() error {
+ if len(t.existingShardLocations) == 0 {
+ glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID)
+ return nil
}
- if params.Server == "" {
- return fmt.Errorf("server is required")
+
+ glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations))
+
+ // Delete existing shards from each location using planned shard locations
+ for _, location := range t.existingShardLocations {
+ if len(location.ShardIds) == 0 {
+ continue
+ }
+
+ glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ ShardIds: location.ShardIds,
+ })
+ return deleteErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err)
+ // Continue with other servers - don't fail the entire cleanup
+ } else {
+ glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
+ }
}
+
+ glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID)
return nil
}
-// EstimateTime estimates the time needed for the task
-func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
- // Base time for erasure coding operation
- baseTime := 30 * time.Second
+// shouldPerformECEncoding checks if the volume meets criteria for EC encoding
+func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool {
+ // For now, always proceed with EC encoding if volume exists
+ // This can be extended with volume size checks, etc.
+ return len(volumeLocations) > 0
+}
- // Could adjust based on volume size or other factors
- return baseTime
+// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers
+func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error {
+ glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations))
+
+ // Mark volume readonly on all replica servers
+ for _, location := range volumeLocations {
+ glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: uint32(volumeId),
+ })
+ return markErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err)
+ return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err)
+ }
+
+ glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location)
+ }
+
+ glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations))
+ return nil
+}
+
+// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
+func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
+ localFiles := make(map[string]string)
+
+ // Copy .dat file
+ datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
+ err := t.copyFileFromSource(".dat", datFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to copy .dat file: %v", err)
+ }
+ localFiles["dat"] = datFile
+ glog.V(1).Infof("Copied .dat file to: %s", datFile)
+
+ // Copy .idx file
+ idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
+ err = t.copyFileFromSource(".idx", idxFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to copy .idx file: %v", err)
+ }
+ localFiles["idx"] = idxFile
+ glog.V(1).Infof("Copied .idx file to: %s", idxFile)
+
+ return localFiles, nil
+}
+
+// copyFileFromSource copies a file from source server to local path using gRPC streaming
+func (t *Task) copyFileFromSource(ext, localPath string) error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ Ext: ext,
+ StopOffset: uint64(math.MaxInt64),
+ })
+ if err != nil {
+ return fmt.Errorf("failed to initiate file copy: %v", err)
+ }
+
+ // Create local file
+ localFile, err := os.Create(localPath)
+ if err != nil {
+ return fmt.Errorf("failed to create local file %s: %v", localPath, err)
+ }
+ defer localFile.Close()
+
+ // Stream data and write to local file
+ totalBytes := int64(0)
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("failed to receive file data: %v", err)
+ }
+
+ if len(resp.FileContent) > 0 {
+ written, writeErr := localFile.Write(resp.FileContent)
+ if writeErr != nil {
+ return fmt.Errorf("failed to write to local file: %v", writeErr)
+ }
+ totalBytes += int64(written)
+ }
+ }
+
+ glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath)
+ return nil
+ })
+}
+
+// generateEcShardsLocally generates EC shards from local volume files
+func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
+ datFile := localFiles["dat"]
+ idxFile := localFiles["idx"]
+
+ if datFile == "" || idxFile == "" {
+ return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
+ }
+
+ // Get base name without extension for EC operations
+ baseName := strings.TrimSuffix(datFile, ".dat")
+
+ shardFiles := make(map[string]string)
+
+ glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
+
+ // Generate EC shard files (.ec00 ~ .ec13)
+ if err := erasure_coding.WriteEcFiles(baseName); err != nil {
+ return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
+ }
+
+ // Generate .ecx file from .idx
+ if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil {
+ return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
+ }
+
+ // Collect generated shard file paths
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
+ if _, err := os.Stat(shardFile); err == nil {
+ shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
+ }
+ }
+
+ // Add metadata files
+ ecxFile := idxFile + ".ecx"
+ if _, err := os.Stat(ecxFile); err == nil {
+ shardFiles["ecx"] = ecxFile
+ }
+
+ // Generate .vif file (volume info)
+ vifFile := baseName + ".vif"
+ // Create basic volume info - in a real implementation, this would come from the original volume
+ volumeInfo := &volume_server_pb.VolumeInfo{
+ Version: uint32(needle.GetCurrentVersion()),
+ }
+ if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
+ glog.Warningf("Failed to create .vif file: %v", err)
+ } else {
+ shardFiles["vif"] = vifFile
+ }
+
+ glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
+ return shardFiles, nil
+}
+
+func (t *Task) copyEcShardsToDestinations() error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations))
+
+ // Prepare shard IDs (0-13 for EC shards)
+ var shardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardIds = append(shardIds, uint32(i))
+ }
+
+ // Distribute shards across destinations
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Track which disks have already received metadata files (server+disk)
+ metadataFilesCopied := make(map[string]bool)
+ var metadataMutex sync.Mutex
+
+ // For each destination, copy a subset of shards
+ shardsPerDest := len(shardIds) / len(destinations)
+ remainder := len(shardIds) % len(destinations)
+
+ shardOffset := 0
+ for i, dest := range destinations {
+ wg.Add(1)
+
+ shardsForThisDest := shardsPerDest
+ if i < remainder {
+ shardsForThisDest++ // Distribute remainder shards
+ }
+
+ destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
+ shardOffset += shardsForThisDest
+
+ go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard copy")
+ return
+ }
+
+ // Create disk-specific metadata key (server+disk)
+ diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
+
+ glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)",
+ targetShardIds, t.sourceServer, destination.Node, destination.DiskId)
+
+ // Check if this disk needs metadata files (only once per disk)
+ metadataMutex.Lock()
+ needsMetadataFiles := !metadataFilesCopied[diskKey]
+ if needsMetadataFiles {
+ metadataFilesCopied[diskKey] = true
+ }
+ metadataMutex.Unlock()
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(t.volumeID),
+ Collection: t.collection,
+ ShardIds: targetShardIds,
+ CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk
+ CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk
+ CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk
+ SourceDataNode: t.sourceServer,
+ DiskId: destination.DiskId, // Pass target disk ID
+ })
+ return copyErr
+ })
+
+ if err != nil {
+ errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
+ return
+ }
+
+ if needsMetadataFiles {
+ glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d",
+ targetShardIds, destination.Node, destination.DiskId)
+ } else {
+ glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)",
+ targetShardIds, destination.Node, destination.DiskId)
+ }
+ }(dest, destShardIds)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any copy errors
+ if err := <-errorChan; err != nil {
+ return err
+ }
+
+ glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers
+func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations))
+
+ // Prepare shard IDs (0-13 for EC shards)
+ var shardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardIds = append(shardIds, uint32(i))
+ }
+
+ // Distribute shards across destinations
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Track which disks have already received metadata files (server+disk)
+ metadataFilesCopied := make(map[string]bool)
+ var metadataMutex sync.Mutex
+
+ // For each destination, send a subset of shards
+ shardsPerDest := len(shardIds) / len(destinations)
+ remainder := len(shardIds) % len(destinations)
+
+ shardOffset := 0
+ for i, dest := range destinations {
+ wg.Add(1)
+
+ shardsForThisDest := shardsPerDest
+ if i < remainder {
+ shardsForThisDest++ // Distribute remainder shards
+ }
+
+ destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
+ shardOffset += shardsForThisDest
+
+ go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard distribution")
+ return
+ }
+
+ // Create disk-specific metadata key (server+disk)
+ diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
+
+ glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)",
+ targetShardIds, destination.Node, destination.DiskId)
+
+ // Check if this disk needs metadata files (only once per disk)
+ metadataMutex.Lock()
+ needsMetadataFiles := !metadataFilesCopied[diskKey]
+ if needsMetadataFiles {
+ metadataFilesCopied[diskKey] = true
+ }
+ metadataMutex.Unlock()
+
+ // Send shard files to destination using HTTP upload (simplified for now)
+ err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles)
+ if err != nil {
+ errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
+ return
+ }
+
+ if needsMetadataFiles {
+ glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d",
+ targetShardIds, destination.Node, destination.DiskId)
+ } else {
+ glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)",
+ targetShardIds, destination.Node, destination.DiskId)
+ }
+ }(dest, destShardIds)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any distribution errors
+ if err := <-errorChan; err != nil {
+ return err
+ }
+
+ glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// sendShardsToDestination sends specific shard files from worker to a destination server (simplified)
+func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error {
+ // For now, use a simplified approach - just upload the files
+ // In a full implementation, this would use proper file upload mechanisms
+ glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId)
+
+ // TODO: Implement actual file upload to volume server
+ // This is a placeholder - actual implementation would:
+ // 1. Open each shard file locally
+ // 2. Upload via HTTP POST or gRPC stream to destination volume server
+ // 3. Volume server would save to the specified disk_id
+
+ return nil
+}
+
+// mountEcShardsOnDestinations mounts EC shards on all destination servers
+func (t *Task) mountEcShardsOnDestinations() error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for mounting EC shards")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations))
+
+ // Prepare all shard IDs (0-13)
+ var allShardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ allShardIds = append(allShardIds, uint32(i))
+ }
+
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Mount shards on each destination server
+ for _, dest := range destinations {
+ wg.Add(1)
+
+ go func(destination *worker_pb.ECDestination) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard mounting")
+ return
+ }
+
+ glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: uint32(t.volumeID),
+ Collection: t.collection,
+ ShardIds: allShardIds, // Mount all available shards on each server
+ })
+ return mountErr
+ })
+
+ if err != nil {
+ // It's normal for some servers to not have all shards, so log as warning rather than error
+ glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err)
+ } else {
+ glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId)
+ }
+ }(dest)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any critical mounting errors
+ select {
+ case err := <-errorChan:
+ if err != nil {
+ glog.Warningf("Some shard mounting issues occurred: %v", err)
+ }
+ default:
+ // No errors
+ }
+
+ glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// deleteVolumeFromAllLocations deletes the original volume from all replica servers
+func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error {
+ glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations))
+
+ for _, location := range volumeLocations {
+ glog.V(1).Infof("Deleting volume %d from %s", volumeId, location)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
+ VolumeId: uint32(volumeId),
+ OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards
+ })
+ return deleteErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err)
+ return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err)
+ }
+
+ glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location)
+ }
+
+ glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations))
+ return nil
+}
+
+// Register the task in the global registry
+func init() {
+ types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask)
+ glog.V(1).Infof("Registered EC task")
}
diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go
deleted file mode 100644
index 0f8b5e376..000000000
--- a/weed/worker/tasks/erasure_coding/ec_detector.go
+++ /dev/null
@@ -1,139 +0,0 @@
-package erasure_coding
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// EcDetector implements erasure coding task detection
-type EcDetector struct {
- enabled bool
- volumeAgeHours int
- fullnessRatio float64
- scanInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*EcDetector)(nil)
-)
-
-// NewEcDetector creates a new erasure coding detector
-func NewEcDetector() *EcDetector {
- return &EcDetector{
- enabled: false, // Conservative default
- volumeAgeHours: 24 * 7, // 1 week
- fullnessRatio: 0.9, // 90% full
- scanInterval: 2 * time.Hour,
- }
-}
-
-// GetTaskType returns the task type
-func (d *EcDetector) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// ScanForTasks scans for volumes that should be converted to erasure coding
-func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- var results []*types.TaskDetectionResult
- now := time.Now()
- ageThreshold := time.Duration(d.volumeAgeHours) * time.Hour
-
- for _, metric := range volumeMetrics {
- // Skip if already EC volume
- if metric.IsECVolume {
- continue
- }
-
- // Check age and fullness criteria
- if metric.Age >= ageThreshold && metric.FullnessRatio >= d.fullnessRatio {
- // Check if volume is read-only (safe for EC conversion)
- if !metric.IsReadOnly {
- continue
- }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeErasureCoding,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: types.TaskPriorityLow, // EC is not urgent
- Reason: "Volume is old and full enough for EC conversion",
- Parameters: map[string]interface{}{
- "age_hours": int(metric.Age.Hours()),
- "fullness_ratio": metric.FullnessRatio,
- },
- ScheduleAt: now,
- }
- results = append(results, result)
- }
- }
-
- glog.V(2).Infof("EC detector found %d tasks to schedule", len(results))
- return results, nil
-}
-
-// ScanInterval returns how often this task type should be scanned
-func (d *EcDetector) ScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// IsEnabled returns whether this task type is enabled
-func (d *EcDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// Configuration setters
-
-func (d *EcDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
-}
-
-func (d *EcDetector) SetVolumeAgeHours(hours int) {
- d.volumeAgeHours = hours
-}
-
-func (d *EcDetector) SetFullnessRatio(ratio float64) {
- d.fullnessRatio = ratio
-}
-
-func (d *EcDetector) SetScanInterval(interval time.Duration) {
- d.scanInterval = interval
-}
-
-// GetVolumeAgeHours returns the current volume age threshold in hours
-func (d *EcDetector) GetVolumeAgeHours() int {
- return d.volumeAgeHours
-}
-
-// GetFullnessRatio returns the current fullness ratio threshold
-func (d *EcDetector) GetFullnessRatio() float64 {
- return d.fullnessRatio
-}
-
-// GetScanInterval returns the scan interval
-func (d *EcDetector) GetScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// ConfigureFromPolicy configures the detector based on the maintenance policy
-func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
- // Type assert to the maintenance policy type we expect
- if maintenancePolicy, ok := policy.(interface {
- GetECEnabled() bool
- GetECVolumeAgeHours() int
- GetECFullnessRatio() float64
- }); ok {
- d.SetEnabled(maintenancePolicy.GetECEnabled())
- d.SetVolumeAgeHours(maintenancePolicy.GetECVolumeAgeHours())
- d.SetFullnessRatio(maintenancePolicy.GetECFullnessRatio())
- } else {
- glog.V(1).Infof("Could not configure EC detector from policy: unsupported policy type")
- }
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go
index 6c4b5bf7f..62cfe6b56 100644
--- a/weed/worker/tasks/erasure_coding/ec_register.go
+++ b/weed/worker/tasks/erasure_coding/ec_register.go
@@ -2,80 +2,71 @@ package erasure_coding
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates erasure coding task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new erasure coding task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeErasureCoding,
- []string{"erasure_coding", "storage", "durability"},
- "Convert volumes to erasure coded format for improved durability",
- ),
- }
-}
-
-// Create creates a new erasure coding task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterErasureCodingTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *EcDetector
- sharedScheduler *Scheduler
-)
+// RegisterErasureCodingTask registers the erasure coding task with the new architecture
+func RegisterErasureCodingTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*EcDetector, *Scheduler) {
- if sharedDetector == nil {
- sharedDetector = NewEcDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeErasureCoding,
+ Name: "erasure_coding",
+ DisplayName: "Erasure Coding",
+ Description: "Applies erasure coding to volumes for data protection",
+ Icon: "fas fa-shield-alt text-success",
+ Capabilities: []string{"erasure_coding", "data_protection"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: nil, // Uses typed task system - see init() in ec.go
+ DetectionFunc: Detection,
+ ScanInterval: 1 * time.Hour,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 24 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*EcDetector, *Scheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeErasureCoding, factory)
+// UpdateConfigFromPersistence updates the erasure coding configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("erasure coding task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated erasure coding task configuration from persistence")
+ return nil
}
diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go
deleted file mode 100644
index b2366bb06..000000000
--- a/weed/worker/tasks/erasure_coding/ec_scheduler.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package erasure_coding
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// Scheduler implements erasure coding task scheduling
-type Scheduler struct {
- maxConcurrent int
- enabled bool
-}
-
-// NewScheduler creates a new erasure coding scheduler
-func NewScheduler() *Scheduler {
- return &Scheduler{
- maxConcurrent: 1, // Conservative default
- enabled: false, // Conservative default
- }
-}
-
-// GetTaskType returns the task type
-func (s *Scheduler) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// CanScheduleNow determines if an erasure coding task can be scheduled now
-func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- if !s.enabled {
- return false
- }
-
- // Check if we have available workers
- if len(availableWorkers) == 0 {
- return false
- }
-
- // Count running EC tasks
- runningCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeErasureCoding {
- runningCount++
- }
- }
-
- // Check concurrency limit
- if runningCount >= s.maxConcurrent {
- glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
- return false
- }
-
- // Check if any worker can handle EC tasks
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeErasureCoding {
- glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
- return true
- }
- }
- }
-
- return false
-}
-
-// GetMaxConcurrent returns the maximum number of concurrent tasks
-func (s *Scheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
-func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
- return 24 * time.Hour // Don't repeat EC for 24 hours
-}
-
-// GetPriority returns the priority for this task
-func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
- return types.TaskPriorityLow // EC is not urgent
-}
-
-// WasTaskRecentlyCompleted checks if a similar task was recently completed
-func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
- // Don't repeat EC for 24 hours
- interval := 24 * time.Hour
- cutoff := now.Add(-interval)
-
- for _, completedTask := range completedTasks {
- if completedTask.Type == types.TaskTypeErasureCoding &&
- completedTask.VolumeID == task.VolumeID &&
- completedTask.Server == task.Server &&
- completedTask.Status == types.TaskStatusCompleted &&
- completedTask.CompletedAt != nil &&
- completedTask.CompletedAt.After(cutoff) {
- return true
- }
- }
- return false
-}
-
-// IsEnabled returns whether this task type is enabled
-func (s *Scheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// Configuration setters
-
-func (s *Scheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
-}
-
-func (s *Scheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
-}
diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go
deleted file mode 100644
index e17cba89a..000000000
--- a/weed/worker/tasks/erasure_coding/ui.go
+++ /dev/null
@@ -1,309 +0,0 @@
-package erasure_coding
-
-import (
- "fmt"
- "html/template"
- "strconv"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// UIProvider provides the UI for erasure coding task configuration
-type UIProvider struct {
- detector *EcDetector
- scheduler *Scheduler
-}
-
-// NewUIProvider creates a new erasure coding UI provider
-func NewUIProvider(detector *EcDetector, scheduler *Scheduler) *UIProvider {
- return &UIProvider{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetTaskType returns the task type
-func (ui *UIProvider) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// GetDisplayName returns the human-readable name
-func (ui *UIProvider) GetDisplayName() string {
- return "Erasure Coding"
-}
-
-// GetDescription returns a description of what this task does
-func (ui *UIProvider) GetDescription() string {
- return "Converts volumes to erasure coded format for improved data durability and fault tolerance"
-}
-
-// GetIcon returns the icon CSS class for this task type
-func (ui *UIProvider) GetIcon() string {
- return "fas fa-shield-alt text-info"
-}
-
-// ErasureCodingConfig represents the erasure coding configuration
-type ErasureCodingConfig struct {
- Enabled bool `json:"enabled"`
- VolumeAgeHoursSeconds int `json:"volume_age_hours_seconds"`
- FullnessRatio float64 `json:"fullness_ratio"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- ShardCount int `json:"shard_count"`
- ParityCount int `json:"parity_count"`
- CollectionFilter string `json:"collection_filter"`
-}
-
-// Helper functions for duration conversion
-func secondsToDuration(seconds int) time.Duration {
- return time.Duration(seconds) * time.Second
-}
-
-func durationToSeconds(d time.Duration) int {
- return int(d.Seconds())
-}
-
-// formatDurationForUser formats seconds as a user-friendly duration string
-func formatDurationForUser(seconds int) string {
- d := secondsToDuration(seconds)
- if d < time.Minute {
- return fmt.Sprintf("%ds", seconds)
- }
- if d < time.Hour {
- return fmt.Sprintf("%.0fm", d.Minutes())
- }
- if d < 24*time.Hour {
- return fmt.Sprintf("%.1fh", d.Hours())
- }
- return fmt.Sprintf("%.1fd", d.Hours()/24)
-}
-
-// RenderConfigForm renders the configuration form HTML
-func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
- config := ui.getCurrentECConfig()
-
- // Build form using the FormBuilder helper
- form := types.NewFormBuilder()
-
- // Detection Settings
- form.AddCheckboxField(
- "enabled",
- "Enable Erasure Coding Tasks",
- "Whether erasure coding tasks should be automatically created",
- config.Enabled,
- )
-
- form.AddNumberField(
- "volume_age_hours_seconds",
- "Volume Age Threshold",
- "Only apply erasure coding to volumes older than this duration",
- float64(config.VolumeAgeHoursSeconds),
- true,
- )
-
- form.AddNumberField(
- "scan_interval_seconds",
- "Scan Interval",
- "How often to scan for volumes needing erasure coding",
- float64(config.ScanIntervalSeconds),
- true,
- )
-
- // Scheduling Settings
- form.AddNumberField(
- "max_concurrent",
- "Max Concurrent Tasks",
- "Maximum number of erasure coding tasks that can run simultaneously",
- float64(config.MaxConcurrent),
- true,
- )
-
- // Erasure Coding Parameters
- form.AddNumberField(
- "shard_count",
- "Data Shards",
- "Number of data shards for erasure coding (recommended: 10)",
- float64(config.ShardCount),
- true,
- )
-
- form.AddNumberField(
- "parity_count",
- "Parity Shards",
- "Number of parity shards for erasure coding (recommended: 4)",
- float64(config.ParityCount),
- true,
- )
-
- // Generate organized form sections using Bootstrap components
- html := `
-<div class="row">
- <div class="col-12">
- <div class="card mb-4">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-shield-alt me-2"></i>
- Erasure Coding Configuration
- </h5>
- </div>
- <div class="card-body">
-` + string(form.Build()) + `
- </div>
- </div>
- </div>
-</div>
-
-<div class="row">
- <div class="col-12">
- <div class="card mb-3">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-info-circle me-2"></i>
- Performance Impact
- </h5>
- </div>
- <div class="card-body">
- <div class="alert alert-info" role="alert">
- <h6 class="alert-heading">Important Notes:</h6>
- <p class="mb-2"><strong>Performance:</strong> Erasure coding is CPU and I/O intensive. Consider running during off-peak hours.</p>
- <p class="mb-0"><strong>Durability:</strong> With ` + fmt.Sprintf("%d+%d", config.ShardCount, config.ParityCount) + ` configuration, can tolerate up to ` + fmt.Sprintf("%d", config.ParityCount) + ` shard failures.</p>
- </div>
- </div>
- </div>
- </div>
-</div>`
-
- return template.HTML(html), nil
-}
-
-// ParseConfigForm parses form data into configuration
-func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
- config := &ErasureCodingConfig{}
-
- // Parse enabled
- config.Enabled = len(formData["enabled"]) > 0
-
- // Parse volume age hours
- if values, ok := formData["volume_age_hours_seconds"]; ok && len(values) > 0 {
- hours, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid volume age hours: %w", err)
- }
- config.VolumeAgeHoursSeconds = hours
- }
-
- // Parse scan interval
- if values, ok := formData["scan_interval_seconds"]; ok && len(values) > 0 {
- interval, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid scan interval: %w", err)
- }
- config.ScanIntervalSeconds = interval
- }
-
- // Parse max concurrent
- if values, ok := formData["max_concurrent"]; ok && len(values) > 0 {
- maxConcurrent, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid max concurrent: %w", err)
- }
- if maxConcurrent < 1 {
- return nil, fmt.Errorf("max concurrent must be at least 1")
- }
- config.MaxConcurrent = maxConcurrent
- }
-
- // Parse shard count
- if values, ok := formData["shard_count"]; ok && len(values) > 0 {
- shardCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid shard count: %w", err)
- }
- if shardCount < 1 {
- return nil, fmt.Errorf("shard count must be at least 1")
- }
- config.ShardCount = shardCount
- }
-
- // Parse parity count
- if values, ok := formData["parity_count"]; ok && len(values) > 0 {
- parityCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid parity count: %w", err)
- }
- if parityCount < 1 {
- return nil, fmt.Errorf("parity count must be at least 1")
- }
- config.ParityCount = parityCount
- }
-
- return config, nil
-}
-
-// GetCurrentConfig returns the current configuration
-func (ui *UIProvider) GetCurrentConfig() interface{} {
- return ui.getCurrentECConfig()
-}
-
-// ApplyConfig applies the new configuration
-func (ui *UIProvider) ApplyConfig(config interface{}) error {
- ecConfig, ok := config.(ErasureCodingConfig)
- if !ok {
- return fmt.Errorf("invalid config type, expected ErasureCodingConfig")
- }
-
- // Apply to detector
- if ui.detector != nil {
- ui.detector.SetEnabled(ecConfig.Enabled)
- ui.detector.SetVolumeAgeHours(ecConfig.VolumeAgeHoursSeconds)
- ui.detector.SetScanInterval(secondsToDuration(ecConfig.ScanIntervalSeconds))
- }
-
- // Apply to scheduler
- if ui.scheduler != nil {
- ui.scheduler.SetEnabled(ecConfig.Enabled)
- ui.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent)
- }
-
- glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, age_threshold=%v, max_concurrent=%d, shards=%d+%d",
- ecConfig.Enabled, ecConfig.VolumeAgeHoursSeconds, ecConfig.MaxConcurrent, ecConfig.ShardCount, ecConfig.ParityCount)
-
- return nil
-}
-
-// getCurrentECConfig gets the current configuration from detector and scheduler
-func (ui *UIProvider) getCurrentECConfig() ErasureCodingConfig {
- config := ErasureCodingConfig{
- // Default values (fallback if detectors/schedulers are nil)
- Enabled: true,
- VolumeAgeHoursSeconds: 24 * 3600, // 24 hours in seconds
- ScanIntervalSeconds: 2 * 3600, // 2 hours in seconds
- MaxConcurrent: 1,
- ShardCount: 10,
- ParityCount: 4,
- }
-
- // Get current values from detector
- if ui.detector != nil {
- config.Enabled = ui.detector.IsEnabled()
- config.VolumeAgeHoursSeconds = ui.detector.GetVolumeAgeHours()
- config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval())
- }
-
- // Get current values from scheduler
- if ui.scheduler != nil {
- config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
- }
-
- return config
-}
-
-// RegisterUI registers the erasure coding UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) {
- uiProvider := NewUIProvider(detector, scheduler)
- uiRegistry.RegisterUI(uiProvider)
-
- glog.V(1).Infof("✅ Registered erasure coding task UI provider")
-}