aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/erasure_coding')
-rw-r--r--weed/worker/tasks/erasure_coding/ec.go79
-rw-r--r--weed/worker/tasks/erasure_coding/ec_detector.go139
-rw-r--r--weed/worker/tasks/erasure_coding/ec_register.go81
-rw-r--r--weed/worker/tasks/erasure_coding/ec_scheduler.go114
-rw-r--r--weed/worker/tasks/erasure_coding/ui.go309
-rw-r--r--weed/worker/tasks/erasure_coding/ui_templ.go319
6 files changed, 1041 insertions, 0 deletions
diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go
new file mode 100644
index 000000000..641dfc6b5
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ec.go
@@ -0,0 +1,79 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Task implements erasure coding operation to convert volumes to EC format
+type Task struct {
+ *tasks.BaseTask
+ server string
+ volumeID uint32
+}
+
+// NewTask creates a new erasure coding task instance
+func NewTask(server string, volumeID uint32) *Task {
+ task := &Task{
+ BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
+ server: server,
+ volumeID: volumeID,
+ }
+ return task
+}
+
+// Execute executes the erasure coding task
+func (t *Task) Execute(params types.TaskParams) error {
+ glog.Infof("Starting erasure coding task for volume %d on server %s", t.volumeID, t.server)
+
+ // Simulate erasure coding operation with progress updates
+ steps := []struct {
+ name string
+ duration time.Duration
+ progress float64
+ }{
+ {"Analyzing volume", 2 * time.Second, 15},
+ {"Creating EC shards", 5 * time.Second, 50},
+ {"Verifying shards", 2 * time.Second, 75},
+ {"Finalizing EC volume", 1 * time.Second, 100},
+ }
+
+ for _, step := range steps {
+ if t.IsCancelled() {
+ return fmt.Errorf("erasure coding task cancelled")
+ }
+
+ glog.V(1).Infof("Erasure coding task step: %s", step.name)
+ t.SetProgress(step.progress)
+
+ // Simulate work
+ time.Sleep(step.duration)
+ }
+
+ glog.Infof("Erasure coding task completed for volume %d on server %s", t.volumeID, t.server)
+ return nil
+}
+
+// Validate validates the task parameters
+func (t *Task) Validate(params types.TaskParams) error {
+ if params.VolumeID == 0 {
+ return fmt.Errorf("volume_id is required")
+ }
+ if params.Server == "" {
+ return fmt.Errorf("server is required")
+ }
+ return nil
+}
+
+// EstimateTime estimates the time needed for the task
+func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
+ // Base time for erasure coding operation
+ baseTime := 30 * time.Second
+
+ // Could adjust based on volume size or other factors
+ return baseTime
+}
diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go
new file mode 100644
index 000000000..0f8b5e376
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ec_detector.go
@@ -0,0 +1,139 @@
+package erasure_coding
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// EcDetector implements erasure coding task detection
+type EcDetector struct {
+ enabled bool
+ volumeAgeHours int
+ fullnessRatio float64
+ scanInterval time.Duration
+}
+
+// Compile-time interface assertions
+var (
+ _ types.TaskDetector = (*EcDetector)(nil)
+)
+
+// NewEcDetector creates a new erasure coding detector
+func NewEcDetector() *EcDetector {
+ return &EcDetector{
+ enabled: false, // Conservative default
+ volumeAgeHours: 24 * 7, // 1 week
+ fullnessRatio: 0.9, // 90% full
+ scanInterval: 2 * time.Hour,
+ }
+}
+
+// GetTaskType returns the task type
+func (d *EcDetector) GetTaskType() types.TaskType {
+ return types.TaskTypeErasureCoding
+}
+
+// ScanForTasks scans for volumes that should be converted to erasure coding
+func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
+ if !d.enabled {
+ return nil, nil
+ }
+
+ var results []*types.TaskDetectionResult
+ now := time.Now()
+ ageThreshold := time.Duration(d.volumeAgeHours) * time.Hour
+
+ for _, metric := range volumeMetrics {
+ // Skip if already EC volume
+ if metric.IsECVolume {
+ continue
+ }
+
+ // Check age and fullness criteria
+ if metric.Age >= ageThreshold && metric.FullnessRatio >= d.fullnessRatio {
+ // Check if volume is read-only (safe for EC conversion)
+ if !metric.IsReadOnly {
+ continue
+ }
+
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: types.TaskPriorityLow, // EC is not urgent
+ Reason: "Volume is old and full enough for EC conversion",
+ Parameters: map[string]interface{}{
+ "age_hours": int(metric.Age.Hours()),
+ "fullness_ratio": metric.FullnessRatio,
+ },
+ ScheduleAt: now,
+ }
+ results = append(results, result)
+ }
+ }
+
+ glog.V(2).Infof("EC detector found %d tasks to schedule", len(results))
+ return results, nil
+}
+
+// ScanInterval returns how often this task type should be scanned
+func (d *EcDetector) ScanInterval() time.Duration {
+ return d.scanInterval
+}
+
+// IsEnabled returns whether this task type is enabled
+func (d *EcDetector) IsEnabled() bool {
+ return d.enabled
+}
+
+// Configuration setters
+
+func (d *EcDetector) SetEnabled(enabled bool) {
+ d.enabled = enabled
+}
+
+func (d *EcDetector) SetVolumeAgeHours(hours int) {
+ d.volumeAgeHours = hours
+}
+
+func (d *EcDetector) SetFullnessRatio(ratio float64) {
+ d.fullnessRatio = ratio
+}
+
+func (d *EcDetector) SetScanInterval(interval time.Duration) {
+ d.scanInterval = interval
+}
+
+// GetVolumeAgeHours returns the current volume age threshold in hours
+func (d *EcDetector) GetVolumeAgeHours() int {
+ return d.volumeAgeHours
+}
+
+// GetFullnessRatio returns the current fullness ratio threshold
+func (d *EcDetector) GetFullnessRatio() float64 {
+ return d.fullnessRatio
+}
+
+// GetScanInterval returns the scan interval
+func (d *EcDetector) GetScanInterval() time.Duration {
+ return d.scanInterval
+}
+
+// ConfigureFromPolicy configures the detector based on the maintenance policy
+func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
+ // Type assert to the maintenance policy type we expect
+ if maintenancePolicy, ok := policy.(interface {
+ GetECEnabled() bool
+ GetECVolumeAgeHours() int
+ GetECFullnessRatio() float64
+ }); ok {
+ d.SetEnabled(maintenancePolicy.GetECEnabled())
+ d.SetVolumeAgeHours(maintenancePolicy.GetECVolumeAgeHours())
+ d.SetFullnessRatio(maintenancePolicy.GetECFullnessRatio())
+ } else {
+ glog.V(1).Infof("Could not configure EC detector from policy: unsupported policy type")
+ }
+}
diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go
new file mode 100644
index 000000000..6c4b5bf7f
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ec_register.go
@@ -0,0 +1,81 @@
+package erasure_coding
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Factory creates erasure coding task instances
+type Factory struct {
+ *tasks.BaseTaskFactory
+}
+
+// NewFactory creates a new erasure coding task factory
+func NewFactory() *Factory {
+ return &Factory{
+ BaseTaskFactory: tasks.NewBaseTaskFactory(
+ types.TaskTypeErasureCoding,
+ []string{"erasure_coding", "storage", "durability"},
+ "Convert volumes to erasure coded format for improved durability",
+ ),
+ }
+}
+
+// Create creates a new erasure coding task instance
+func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
+ // Validate parameters
+ if params.VolumeID == 0 {
+ return nil, fmt.Errorf("volume_id is required")
+ }
+ if params.Server == "" {
+ return nil, fmt.Errorf("server is required")
+ }
+
+ task := NewTask(params.Server, params.VolumeID)
+ task.SetEstimatedDuration(task.EstimateTime(params))
+
+ return task, nil
+}
+
+// Shared detector and scheduler instances
+var (
+ sharedDetector *EcDetector
+ sharedScheduler *Scheduler
+)
+
+// getSharedInstances returns the shared detector and scheduler instances
+func getSharedInstances() (*EcDetector, *Scheduler) {
+ if sharedDetector == nil {
+ sharedDetector = NewEcDetector()
+ }
+ if sharedScheduler == nil {
+ sharedScheduler = NewScheduler()
+ }
+ return sharedDetector, sharedScheduler
+}
+
+// GetSharedInstances returns the shared detector and scheduler instances (public access)
+func GetSharedInstances() (*EcDetector, *Scheduler) {
+ return getSharedInstances()
+}
+
+// Auto-register this task when the package is imported
+func init() {
+ factory := NewFactory()
+ tasks.AutoRegister(types.TaskTypeErasureCoding, factory)
+
+ // Get shared instances for all registrations
+ detector, scheduler := getSharedInstances()
+
+ // Register with types registry
+ tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
+ registry.RegisterTask(detector, scheduler)
+ })
+
+ // Register with UI registry using the same instances
+ tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
+ RegisterUI(uiRegistry, detector, scheduler)
+ })
+}
diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go
new file mode 100644
index 000000000..b2366bb06
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ec_scheduler.go
@@ -0,0 +1,114 @@
+package erasure_coding
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Scheduler implements erasure coding task scheduling
+type Scheduler struct {
+ maxConcurrent int
+ enabled bool
+}
+
+// NewScheduler creates a new erasure coding scheduler
+func NewScheduler() *Scheduler {
+ return &Scheduler{
+ maxConcurrent: 1, // Conservative default
+ enabled: false, // Conservative default
+ }
+}
+
+// GetTaskType returns the task type
+func (s *Scheduler) GetTaskType() types.TaskType {
+ return types.TaskTypeErasureCoding
+}
+
+// CanScheduleNow determines if an erasure coding task can be scheduled now
+func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if !s.enabled {
+ return false
+ }
+
+ // Check if we have available workers
+ if len(availableWorkers) == 0 {
+ return false
+ }
+
+ // Count running EC tasks
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeErasureCoding {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningCount >= s.maxConcurrent {
+ glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
+ return false
+ }
+
+ // Check if any worker can handle EC tasks
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeErasureCoding {
+ glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// GetMaxConcurrent returns the maximum number of concurrent tasks
+func (s *Scheduler) GetMaxConcurrent() int {
+ return s.maxConcurrent
+}
+
+// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
+func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
+ return 24 * time.Hour // Don't repeat EC for 24 hours
+}
+
+// GetPriority returns the priority for this task
+func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
+ return types.TaskPriorityLow // EC is not urgent
+}
+
+// WasTaskRecentlyCompleted checks if a similar task was recently completed
+func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
+ // Don't repeat EC for 24 hours
+ interval := 24 * time.Hour
+ cutoff := now.Add(-interval)
+
+ for _, completedTask := range completedTasks {
+ if completedTask.Type == types.TaskTypeErasureCoding &&
+ completedTask.VolumeID == task.VolumeID &&
+ completedTask.Server == task.Server &&
+ completedTask.Status == types.TaskStatusCompleted &&
+ completedTask.CompletedAt != nil &&
+ completedTask.CompletedAt.After(cutoff) {
+ return true
+ }
+ }
+ return false
+}
+
+// IsEnabled returns whether this task type is enabled
+func (s *Scheduler) IsEnabled() bool {
+ return s.enabled
+}
+
+// Configuration setters
+
+func (s *Scheduler) SetEnabled(enabled bool) {
+ s.enabled = enabled
+}
+
+func (s *Scheduler) SetMaxConcurrent(max int) {
+ s.maxConcurrent = max
+}
diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go
new file mode 100644
index 000000000..8a4640cf8
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ui.go
@@ -0,0 +1,309 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "html/template"
+ "strconv"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// UIProvider provides the UI for erasure coding task configuration
+type UIProvider struct {
+ detector *EcDetector
+ scheduler *Scheduler
+}
+
+// NewUIProvider creates a new erasure coding UI provider
+func NewUIProvider(detector *EcDetector, scheduler *Scheduler) *UIProvider {
+ return &UIProvider{
+ detector: detector,
+ scheduler: scheduler,
+ }
+}
+
+// GetTaskType returns the task type
+func (ui *UIProvider) GetTaskType() types.TaskType {
+ return types.TaskTypeErasureCoding
+}
+
+// GetDisplayName returns the human-readable name
+func (ui *UIProvider) GetDisplayName() string {
+ return "Erasure Coding"
+}
+
+// GetDescription returns a description of what this task does
+func (ui *UIProvider) GetDescription() string {
+ return "Converts volumes to erasure coded format for improved data durability and fault tolerance"
+}
+
+// GetIcon returns the icon CSS class for this task type
+func (ui *UIProvider) GetIcon() string {
+ return "fas fa-shield-alt text-info"
+}
+
+// ErasureCodingConfig represents the erasure coding configuration
+type ErasureCodingConfig struct {
+ Enabled bool `json:"enabled"`
+ VolumeAgeHoursSeconds int `json:"volume_age_hours_seconds"`
+ FullnessRatio float64 `json:"fullness_ratio"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+ ShardCount int `json:"shard_count"`
+ ParityCount int `json:"parity_count"`
+ CollectionFilter string `json:"collection_filter"`
+}
+
+// Helper functions for duration conversion
+func secondsToDuration(seconds int) time.Duration {
+ return time.Duration(seconds) * time.Second
+}
+
+func durationToSeconds(d time.Duration) int {
+ return int(d.Seconds())
+}
+
+// formatDurationForUser formats seconds as a user-friendly duration string
+func formatDurationForUser(seconds int) string {
+ d := secondsToDuration(seconds)
+ if d < time.Minute {
+ return fmt.Sprintf("%ds", seconds)
+ }
+ if d < time.Hour {
+ return fmt.Sprintf("%.0fm", d.Minutes())
+ }
+ if d < 24*time.Hour {
+ return fmt.Sprintf("%.1fh", d.Hours())
+ }
+ return fmt.Sprintf("%.1fd", d.Hours()/24)
+}
+
+// RenderConfigForm renders the configuration form HTML
+func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
+ config := ui.getCurrentECConfig()
+
+ // Build form using the FormBuilder helper
+ form := types.NewFormBuilder()
+
+ // Detection Settings
+ form.AddCheckboxField(
+ "enabled",
+ "Enable Erasure Coding Tasks",
+ "Whether erasure coding tasks should be automatically created",
+ config.Enabled,
+ )
+
+ form.AddNumberField(
+ "volume_age_hours_seconds",
+ "Volume Age Threshold",
+ "Only apply erasure coding to volumes older than this duration",
+ float64(config.VolumeAgeHoursSeconds),
+ true,
+ )
+
+ form.AddNumberField(
+ "scan_interval_seconds",
+ "Scan Interval",
+ "How often to scan for volumes needing erasure coding",
+ float64(config.ScanIntervalSeconds),
+ true,
+ )
+
+ // Scheduling Settings
+ form.AddNumberField(
+ "max_concurrent",
+ "Max Concurrent Tasks",
+ "Maximum number of erasure coding tasks that can run simultaneously",
+ float64(config.MaxConcurrent),
+ true,
+ )
+
+ // Erasure Coding Parameters
+ form.AddNumberField(
+ "shard_count",
+ "Data Shards",
+ "Number of data shards for erasure coding (recommended: 10)",
+ float64(config.ShardCount),
+ true,
+ )
+
+ form.AddNumberField(
+ "parity_count",
+ "Parity Shards",
+ "Number of parity shards for erasure coding (recommended: 4)",
+ float64(config.ParityCount),
+ true,
+ )
+
+ // Generate organized form sections using Bootstrap components
+ html := `
+<div class="row">
+ <div class="col-12">
+ <div class="card mb-4">
+ <div class="card-header">
+ <h5 class="mb-0">
+ <i class="fas fa-shield-alt me-2"></i>
+ Erasure Coding Configuration
+ </h5>
+ </div>
+ <div class="card-body">
+` + string(form.Build()) + `
+ </div>
+ </div>
+ </div>
+</div>
+
+<div class="row">
+ <div class="col-12">
+ <div class="card mb-3">
+ <div class="card-header">
+ <h5 class="mb-0">
+ <i class="fas fa-info-circle me-2"></i>
+ Performance Impact
+ </h5>
+ </div>
+ <div class="card-body">
+ <div class="alert alert-info" role="alert">
+ <h6 class="alert-heading">Important Notes:</h6>
+ <p class="mb-2"><strong>Performance:</strong> Erasure coding is CPU and I/O intensive. Consider running during off-peak hours.</p>
+ <p class="mb-0"><strong>Durability:</strong> With ` + fmt.Sprintf("%d+%d", config.ShardCount, config.ParityCount) + ` configuration, can tolerate up to ` + fmt.Sprintf("%d", config.ParityCount) + ` shard failures.</p>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>`
+
+ return template.HTML(html), nil
+}
+
+// ParseConfigForm parses form data into configuration
+func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
+ config := &ErasureCodingConfig{}
+
+ // Parse enabled
+ config.Enabled = len(formData["enabled"]) > 0
+
+ // Parse volume age hours
+ if values, ok := formData["volume_age_hours_seconds"]; ok && len(values) > 0 {
+ hours, err := strconv.Atoi(values[0])
+ if err != nil {
+ return nil, fmt.Errorf("invalid volume age hours: %v", err)
+ }
+ config.VolumeAgeHoursSeconds = hours
+ }
+
+ // Parse scan interval
+ if values, ok := formData["scan_interval_seconds"]; ok && len(values) > 0 {
+ interval, err := strconv.Atoi(values[0])
+ if err != nil {
+ return nil, fmt.Errorf("invalid scan interval: %v", err)
+ }
+ config.ScanIntervalSeconds = interval
+ }
+
+ // Parse max concurrent
+ if values, ok := formData["max_concurrent"]; ok && len(values) > 0 {
+ maxConcurrent, err := strconv.Atoi(values[0])
+ if err != nil {
+ return nil, fmt.Errorf("invalid max concurrent: %v", err)
+ }
+ if maxConcurrent < 1 {
+ return nil, fmt.Errorf("max concurrent must be at least 1")
+ }
+ config.MaxConcurrent = maxConcurrent
+ }
+
+ // Parse shard count
+ if values, ok := formData["shard_count"]; ok && len(values) > 0 {
+ shardCount, err := strconv.Atoi(values[0])
+ if err != nil {
+ return nil, fmt.Errorf("invalid shard count: %v", err)
+ }
+ if shardCount < 1 {
+ return nil, fmt.Errorf("shard count must be at least 1")
+ }
+ config.ShardCount = shardCount
+ }
+
+ // Parse parity count
+ if values, ok := formData["parity_count"]; ok && len(values) > 0 {
+ parityCount, err := strconv.Atoi(values[0])
+ if err != nil {
+ return nil, fmt.Errorf("invalid parity count: %v", err)
+ }
+ if parityCount < 1 {
+ return nil, fmt.Errorf("parity count must be at least 1")
+ }
+ config.ParityCount = parityCount
+ }
+
+ return config, nil
+}
+
+// GetCurrentConfig returns the current configuration
+func (ui *UIProvider) GetCurrentConfig() interface{} {
+ return ui.getCurrentECConfig()
+}
+
+// ApplyConfig applies the new configuration
+func (ui *UIProvider) ApplyConfig(config interface{}) error {
+ ecConfig, ok := config.(ErasureCodingConfig)
+ if !ok {
+ return fmt.Errorf("invalid config type, expected ErasureCodingConfig")
+ }
+
+ // Apply to detector
+ if ui.detector != nil {
+ ui.detector.SetEnabled(ecConfig.Enabled)
+ ui.detector.SetVolumeAgeHours(ecConfig.VolumeAgeHoursSeconds)
+ ui.detector.SetScanInterval(secondsToDuration(ecConfig.ScanIntervalSeconds))
+ }
+
+ // Apply to scheduler
+ if ui.scheduler != nil {
+ ui.scheduler.SetEnabled(ecConfig.Enabled)
+ ui.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent)
+ }
+
+ glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, age_threshold=%v, max_concurrent=%d, shards=%d+%d",
+ ecConfig.Enabled, ecConfig.VolumeAgeHoursSeconds, ecConfig.MaxConcurrent, ecConfig.ShardCount, ecConfig.ParityCount)
+
+ return nil
+}
+
+// getCurrentECConfig gets the current configuration from detector and scheduler
+func (ui *UIProvider) getCurrentECConfig() ErasureCodingConfig {
+ config := ErasureCodingConfig{
+ // Default values (fallback if detectors/schedulers are nil)
+ Enabled: true,
+ VolumeAgeHoursSeconds: 24 * 3600, // 24 hours in seconds
+ ScanIntervalSeconds: 2 * 3600, // 2 hours in seconds
+ MaxConcurrent: 1,
+ ShardCount: 10,
+ ParityCount: 4,
+ }
+
+ // Get current values from detector
+ if ui.detector != nil {
+ config.Enabled = ui.detector.IsEnabled()
+ config.VolumeAgeHoursSeconds = ui.detector.GetVolumeAgeHours()
+ config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval())
+ }
+
+ // Get current values from scheduler
+ if ui.scheduler != nil {
+ config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
+ }
+
+ return config
+}
+
+// RegisterUI registers the erasure coding UI provider with the UI registry
+func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) {
+ uiProvider := NewUIProvider(detector, scheduler)
+ uiRegistry.RegisterUI(uiProvider)
+
+ glog.V(1).Infof("✅ Registered erasure coding task UI provider")
+}
diff --git a/weed/worker/tasks/erasure_coding/ui_templ.go b/weed/worker/tasks/erasure_coding/ui_templ.go
new file mode 100644
index 000000000..12c3d199e
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ui_templ.go
@@ -0,0 +1,319 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "strconv"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/view/components"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Helper function to format seconds as duration string
+func formatDurationFromSeconds(seconds int) string {
+ d := time.Duration(seconds) * time.Second
+ return d.String()
+}
+
+// Helper function to convert value and unit to seconds
+func valueAndUnitToSeconds(value float64, unit string) int {
+ switch unit {
+ case "days":
+ return int(value * 24 * 60 * 60)
+ case "hours":
+ return int(value * 60 * 60)
+ case "minutes":
+ return int(value * 60)
+ default:
+ return int(value * 60) // Default to minutes
+ }
+}
+
+// UITemplProvider provides the templ-based UI for erasure coding task configuration
+type UITemplProvider struct {
+ detector *EcDetector
+ scheduler *Scheduler
+}
+
+// NewUITemplProvider creates a new erasure coding templ UI provider
+func NewUITemplProvider(detector *EcDetector, scheduler *Scheduler) *UITemplProvider {
+ return &UITemplProvider{
+ detector: detector,
+ scheduler: scheduler,
+ }
+}
+
+// ErasureCodingConfig is defined in ui.go - we reuse it
+
+// GetTaskType returns the task type
+func (ui *UITemplProvider) GetTaskType() types.TaskType {
+ return types.TaskTypeErasureCoding
+}
+
+// GetDisplayName returns the human-readable name
+func (ui *UITemplProvider) GetDisplayName() string {
+ return "Erasure Coding"
+}
+
+// GetDescription returns a description of what this task does
+func (ui *UITemplProvider) GetDescription() string {
+ return "Converts replicated volumes to erasure-coded format for efficient storage"
+}
+
+// GetIcon returns the icon CSS class for this task type
+func (ui *UITemplProvider) GetIcon() string {
+ return "fas fa-shield-alt text-info"
+}
+
+// RenderConfigSections renders the configuration as templ section data
+func (ui *UITemplProvider) RenderConfigSections(currentConfig interface{}) ([]components.ConfigSectionData, error) {
+ config := ui.getCurrentECConfig()
+
+ // Detection settings section
+ detectionSection := components.ConfigSectionData{
+ Title: "Detection Settings",
+ Icon: "fas fa-search",
+ Description: "Configure when erasure coding tasks should be triggered",
+ Fields: []interface{}{
+ components.CheckboxFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "enabled",
+ Label: "Enable Erasure Coding Tasks",
+ Description: "Whether erasure coding tasks should be automatically created",
+ },
+ Checked: config.Enabled,
+ },
+ components.DurationInputFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "scan_interval",
+ Label: "Scan Interval",
+ Description: "How often to scan for volumes needing erasure coding",
+ Required: true,
+ },
+ Seconds: config.ScanIntervalSeconds,
+ },
+ components.DurationInputFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "volume_age_threshold",
+ Label: "Volume Age Threshold",
+ Description: "Only apply erasure coding to volumes older than this age",
+ Required: true,
+ },
+ Seconds: config.VolumeAgeHoursSeconds,
+ },
+ },
+ }
+
+ // Erasure coding parameters section
+ paramsSection := components.ConfigSectionData{
+ Title: "Erasure Coding Parameters",
+ Icon: "fas fa-cogs",
+ Description: "Configure erasure coding scheme and performance",
+ Fields: []interface{}{
+ components.NumberFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "data_shards",
+ Label: "Data Shards",
+ Description: "Number of data shards in the erasure coding scheme",
+ Required: true,
+ },
+ Value: float64(config.ShardCount),
+ Step: "1",
+ Min: floatPtr(1),
+ Max: floatPtr(16),
+ },
+ components.NumberFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "parity_shards",
+ Label: "Parity Shards",
+ Description: "Number of parity shards (determines fault tolerance)",
+ Required: true,
+ },
+ Value: float64(config.ParityCount),
+ Step: "1",
+ Min: floatPtr(1),
+ Max: floatPtr(16),
+ },
+ components.NumberFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "max_concurrent",
+ Label: "Max Concurrent Tasks",
+ Description: "Maximum number of erasure coding tasks that can run simultaneously",
+ Required: true,
+ },
+ Value: float64(config.MaxConcurrent),
+ Step: "1",
+ Min: floatPtr(1),
+ },
+ },
+ }
+
+ // Performance impact info section
+ infoSection := components.ConfigSectionData{
+ Title: "Performance Impact",
+ Icon: "fas fa-info-circle",
+ Description: "Important information about erasure coding operations",
+ Fields: []interface{}{
+ components.TextFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "durability_info",
+ Label: "Durability",
+ Description: fmt.Sprintf("With %d+%d configuration, can tolerate up to %d shard failures",
+ config.ShardCount, config.ParityCount, config.ParityCount),
+ },
+ Value: "High durability with space efficiency",
+ },
+ components.TextFieldData{
+ FormFieldData: components.FormFieldData{
+ Name: "performance_info",
+ Label: "Performance Note",
+ Description: "Erasure coding is CPU and I/O intensive. Consider running during off-peak hours",
+ },
+ Value: "Schedule during low-traffic periods",
+ },
+ },
+ }
+
+ return []components.ConfigSectionData{detectionSection, paramsSection, infoSection}, nil
+}
+
+// ParseConfigForm parses form data into configuration
+func (ui *UITemplProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
+ config := &ErasureCodingConfig{}
+
+ // Parse enabled checkbox
+ config.Enabled = len(formData["enabled"]) > 0 && formData["enabled"][0] == "on"
+
+ // Parse volume age threshold
+ if valueStr := formData["volume_age_threshold"]; len(valueStr) > 0 {
+ if value, err := strconv.ParseFloat(valueStr[0], 64); err != nil {
+ return nil, fmt.Errorf("invalid volume age threshold value: %v", err)
+ } else {
+ unit := "hours" // default
+ if unitStr := formData["volume_age_threshold_unit"]; len(unitStr) > 0 {
+ unit = unitStr[0]
+ }
+ config.VolumeAgeHoursSeconds = valueAndUnitToSeconds(value, unit)
+ }
+ }
+
+ // Parse scan interval
+ if valueStr := formData["scan_interval"]; len(valueStr) > 0 {
+ if value, err := strconv.ParseFloat(valueStr[0], 64); err != nil {
+ return nil, fmt.Errorf("invalid scan interval value: %v", err)
+ } else {
+ unit := "hours" // default
+ if unitStr := formData["scan_interval_unit"]; len(unitStr) > 0 {
+ unit = unitStr[0]
+ }
+ config.ScanIntervalSeconds = valueAndUnitToSeconds(value, unit)
+ }
+ }
+
+ // Parse data shards
+ if shardsStr := formData["data_shards"]; len(shardsStr) > 0 {
+ if shards, err := strconv.Atoi(shardsStr[0]); err != nil {
+ return nil, fmt.Errorf("invalid data shards: %v", err)
+ } else if shards < 1 || shards > 16 {
+ return nil, fmt.Errorf("data shards must be between 1 and 16")
+ } else {
+ config.ShardCount = shards
+ }
+ }
+
+ // Parse parity shards
+ if shardsStr := formData["parity_shards"]; len(shardsStr) > 0 {
+ if shards, err := strconv.Atoi(shardsStr[0]); err != nil {
+ return nil, fmt.Errorf("invalid parity shards: %v", err)
+ } else if shards < 1 || shards > 16 {
+ return nil, fmt.Errorf("parity shards must be between 1 and 16")
+ } else {
+ config.ParityCount = shards
+ }
+ }
+
+ // Parse max concurrent
+ if concurrentStr := formData["max_concurrent"]; len(concurrentStr) > 0 {
+ if concurrent, err := strconv.Atoi(concurrentStr[0]); err != nil {
+ return nil, fmt.Errorf("invalid max concurrent: %v", err)
+ } else if concurrent < 1 {
+ return nil, fmt.Errorf("max concurrent must be at least 1")
+ } else {
+ config.MaxConcurrent = concurrent
+ }
+ }
+
+ return config, nil
+}
+
+// GetCurrentConfig returns the current configuration
+func (ui *UITemplProvider) GetCurrentConfig() interface{} {
+ return ui.getCurrentECConfig()
+}
+
+// ApplyConfig applies the new configuration
+func (ui *UITemplProvider) ApplyConfig(config interface{}) error {
+ ecConfig, ok := config.(*ErasureCodingConfig)
+ if !ok {
+ return fmt.Errorf("invalid config type, expected *ErasureCodingConfig")
+ }
+
+ // Apply to detector
+ if ui.detector != nil {
+ ui.detector.SetEnabled(ecConfig.Enabled)
+ ui.detector.SetVolumeAgeHours(ecConfig.VolumeAgeHoursSeconds)
+ ui.detector.SetScanInterval(time.Duration(ecConfig.ScanIntervalSeconds) * time.Second)
+ }
+
+ // Apply to scheduler
+ if ui.scheduler != nil {
+ ui.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent)
+ ui.scheduler.SetEnabled(ecConfig.Enabled)
+ }
+
+ glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, age_threshold=%ds, max_concurrent=%d",
+ ecConfig.Enabled, ecConfig.VolumeAgeHoursSeconds, ecConfig.MaxConcurrent)
+
+ return nil
+}
+
+// getCurrentECConfig gets the current configuration from detector and scheduler
+func (ui *UITemplProvider) getCurrentECConfig() *ErasureCodingConfig {
+ config := &ErasureCodingConfig{
+ // Default values (fallback if detectors/schedulers are nil)
+ Enabled: true,
+ VolumeAgeHoursSeconds: int((24 * time.Hour).Seconds()),
+ ScanIntervalSeconds: int((2 * time.Hour).Seconds()),
+ MaxConcurrent: 1,
+ ShardCount: 10,
+ ParityCount: 4,
+ }
+
+ // Get current values from detector
+ if ui.detector != nil {
+ config.Enabled = ui.detector.IsEnabled()
+ config.VolumeAgeHoursSeconds = ui.detector.GetVolumeAgeHours()
+ config.ScanIntervalSeconds = int(ui.detector.ScanInterval().Seconds())
+ }
+
+ // Get current values from scheduler
+ if ui.scheduler != nil {
+ config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
+ }
+
+ return config
+}
+
+// floatPtr is a helper function to create float64 pointers
+func floatPtr(f float64) *float64 {
+ return &f
+}
+
+// RegisterUITempl registers the erasure coding templ UI provider with the UI registry
+func RegisterUITempl(uiRegistry *types.UITemplRegistry, detector *EcDetector, scheduler *Scheduler) {
+ uiProvider := NewUITemplProvider(detector, scheduler)
+ uiRegistry.RegisterUI(uiProvider)
+
+ glog.V(1).Infof("✅ Registered erasure coding task templ UI provider")
+}