aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/admin/handlers/maintenance_handlers.go6
-rw-r--r--weed/worker/tasks/balance/balance_detector.go171
-rw-r--r--weed/worker/tasks/balance/balance_register.go78
-rw-r--r--weed/worker/tasks/balance/balance_scheduler.go197
-rw-r--r--weed/worker/tasks/balance/balance_v2.go274
-rw-r--r--weed/worker/tasks/balance/config_schema.go88
-rw-r--r--weed/worker/tasks/balance/schema_provider.go18
-rw-r--r--weed/worker/tasks/balance/ui.go115
-rw-r--r--weed/worker/tasks/base/generic_components.go129
-rw-r--r--weed/worker/tasks/base/registration.go131
-rw-r--r--weed/worker/tasks/base/task_definition.go95
-rw-r--r--weed/worker/tasks/erasure_coding/config_schema.go100
-rw-r--r--weed/worker/tasks/erasure_coding/ec_detector.go226
-rw-r--r--weed/worker/tasks/erasure_coding/ec_register.go87
-rw-r--r--weed/worker/tasks/erasure_coding/ec_scheduler.go95
-rw-r--r--weed/worker/tasks/erasure_coding/ec_v2.go301
-rw-r--r--weed/worker/tasks/erasure_coding/schema_provider.go18
-rw-r--r--weed/worker/tasks/erasure_coding/ui.go107
-rw-r--r--weed/worker/tasks/vacuum/config_schema.go112
-rw-r--r--weed/worker/tasks/vacuum/schema_provider.go18
-rw-r--r--weed/worker/tasks/vacuum/ui.go112
-rw-r--r--weed/worker/tasks/vacuum/vacuum_detector.go132
-rw-r--r--weed/worker/tasks/vacuum/vacuum_register.go78
-rw-r--r--weed/worker/tasks/vacuum/vacuum_scheduler.go111
-rw-r--r--weed/worker/tasks/vacuum/vacuum_v2.go269
-rw-r--r--weed/worker/tasks/vacuum_v2/vacuum.go260
26 files changed, 1468 insertions, 1860 deletions
diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go
index a587d1b96..6ff40a8db 100644
--- a/weed/admin/handlers/maintenance_handlers.go
+++ b/weed/admin/handlers/maintenance_handlers.go
@@ -192,11 +192,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
var config interface{}
switch taskType {
case types.TaskTypeVacuum:
- config = &vacuum.VacuumConfig{}
+ config = &vacuum.VacuumConfigV2{}
case types.TaskTypeBalance:
- config = &balance.BalanceConfig{}
+ config = &balance.BalanceConfigV2{}
case types.TaskTypeErasureCoding:
- config = &erasure_coding.ErasureCodingConfig{}
+ config = &erasure_coding.ErasureCodingConfigV2{}
default:
c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName})
return
diff --git a/weed/worker/tasks/balance/balance_detector.go b/weed/worker/tasks/balance/balance_detector.go
deleted file mode 100644
index f082b7a77..000000000
--- a/weed/worker/tasks/balance/balance_detector.go
+++ /dev/null
@@ -1,171 +0,0 @@
-package balance
-
-import (
- "fmt"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// BalanceDetector implements TaskDetector for balance tasks
-type BalanceDetector struct {
- enabled bool
- threshold float64 // Imbalance threshold (0.1 = 10%)
- minCheckInterval time.Duration
- minVolumeCount int
- lastCheck time.Time
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*BalanceDetector)(nil)
-)
-
-// NewBalanceDetector creates a new balance detector
-func NewBalanceDetector() *BalanceDetector {
- return &BalanceDetector{
- enabled: true,
- threshold: 0.1, // 10% imbalance threshold
- minCheckInterval: 1 * time.Hour,
- minVolumeCount: 10, // Don't balance small clusters
- lastCheck: time.Time{},
- }
-}
-
-// GetTaskType returns the task type
-func (d *BalanceDetector) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// ScanForTasks checks if cluster balance is needed
-func (d *BalanceDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- glog.V(2).Infof("Scanning for balance tasks...")
-
- // Don't check too frequently
- if time.Since(d.lastCheck) < d.minCheckInterval {
- return nil, nil
- }
- d.lastCheck = time.Now()
-
- // Skip if cluster is too small
- if len(volumeMetrics) < d.minVolumeCount {
- glog.V(2).Infof("Cluster too small for balance (%d volumes < %d minimum)", len(volumeMetrics), d.minVolumeCount)
- return nil, nil
- }
-
- // Analyze volume distribution across servers
- serverVolumeCounts := make(map[string]int)
- for _, metric := range volumeMetrics {
- serverVolumeCounts[metric.Server]++
- }
-
- if len(serverVolumeCounts) < 2 {
- glog.V(2).Infof("Not enough servers for balance (%d servers)", len(serverVolumeCounts))
- return nil, nil
- }
-
- // Calculate balance metrics
- totalVolumes := len(volumeMetrics)
- avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
-
- maxVolumes := 0
- minVolumes := totalVolumes
- maxServer := ""
- minServer := ""
-
- for server, count := range serverVolumeCounts {
- if count > maxVolumes {
- maxVolumes = count
- maxServer = server
- }
- if count < minVolumes {
- minVolumes = count
- minServer = server
- }
- }
-
- // Check if imbalance exceeds threshold
- imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
- if imbalanceRatio <= d.threshold {
- glog.V(2).Infof("Cluster is balanced (imbalance ratio: %.2f <= %.2f)", imbalanceRatio, d.threshold)
- return nil, nil
- }
-
- // Create balance task
- reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
- imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
-
- task := &types.TaskDetectionResult{
- TaskType: types.TaskTypeBalance,
- Priority: types.TaskPriorityNormal,
- Reason: reason,
- ScheduleAt: time.Now(),
- Parameters: map[string]interface{}{
- "imbalance_ratio": imbalanceRatio,
- "threshold": d.threshold,
- "max_volumes": maxVolumes,
- "min_volumes": minVolumes,
- "avg_volumes_per_server": avgVolumesPerServer,
- "max_server": maxServer,
- "min_server": minServer,
- "total_servers": len(serverVolumeCounts),
- },
- }
-
- glog.V(1).Infof("πŸ”„ Found balance task: %s", reason)
- return []*types.TaskDetectionResult{task}, nil
-}
-
-// ScanInterval returns how often to scan
-func (d *BalanceDetector) ScanInterval() time.Duration {
- return d.minCheckInterval
-}
-
-// IsEnabled returns whether the detector is enabled
-func (d *BalanceDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// SetEnabled sets whether the detector is enabled
-func (d *BalanceDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
- glog.V(1).Infof("πŸ”„ Balance detector enabled: %v", enabled)
-}
-
-// SetThreshold sets the imbalance threshold
-func (d *BalanceDetector) SetThreshold(threshold float64) {
- d.threshold = threshold
- glog.V(1).Infof("πŸ”„ Balance threshold set to: %.1f%%", threshold*100)
-}
-
-// SetMinCheckInterval sets the minimum time between balance checks
-func (d *BalanceDetector) SetMinCheckInterval(interval time.Duration) {
- d.minCheckInterval = interval
- glog.V(1).Infof("πŸ”„ Balance check interval set to: %v", interval)
-}
-
-// SetMinVolumeCount sets the minimum volume count for balance operations
-func (d *BalanceDetector) SetMinVolumeCount(count int) {
- d.minVolumeCount = count
- glog.V(1).Infof("πŸ”„ Balance minimum volume count set to: %d", count)
-}
-
-// GetThreshold returns the current imbalance threshold
-func (d *BalanceDetector) GetThreshold() float64 {
- return d.threshold
-}
-
-// GetMinCheckInterval returns the minimum check interval
-func (d *BalanceDetector) GetMinCheckInterval() time.Duration {
- return d.minCheckInterval
-}
-
-// GetMinVolumeCount returns the minimum volume count
-func (d *BalanceDetector) GetMinVolumeCount() int {
- return d.minVolumeCount
-}
diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go
index 7c2d5a520..def779389 100644
--- a/weed/worker/tasks/balance/balance_register.go
+++ b/weed/worker/tasks/balance/balance_register.go
@@ -1,81 +1,7 @@
package balance
-import (
- "fmt"
-
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// Factory creates balance task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
-
-// NewFactory creates a new balance task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeBalance,
- []string{"balance", "storage", "optimization"},
- "Balance data across volume servers for optimal performance",
- ),
- }
-}
-
-// Create creates a new balance task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID, params.Collection)
- task.SetEstimatedDuration(task.EstimateTime(params))
-
- return task, nil
-}
-
-// Shared detector and scheduler instances
-var (
- sharedDetector *BalanceDetector
- sharedScheduler *BalanceScheduler
-)
-
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- if sharedDetector == nil {
- sharedDetector = NewBalanceDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewBalanceScheduler()
- }
- return sharedDetector, sharedScheduler
-}
-
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- return getSharedInstances()
-}
-
// Auto-register this task when the package is imported
func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeBalance, factory)
-
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
-
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
-
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ // Use new architecture instead of old registration
+ initBalanceV2()
}
diff --git a/weed/worker/tasks/balance/balance_scheduler.go b/weed/worker/tasks/balance/balance_scheduler.go
deleted file mode 100644
index a8fefe465..000000000
--- a/weed/worker/tasks/balance/balance_scheduler.go
+++ /dev/null
@@ -1,197 +0,0 @@
-package balance
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// BalanceScheduler implements TaskScheduler for balance tasks
-type BalanceScheduler struct {
- enabled bool
- maxConcurrent int
- minInterval time.Duration
- lastScheduled map[string]time.Time // track when we last scheduled a balance for each task type
- minServerCount int
- moveDuringOffHours bool
- offHoursStart string
- offHoursEnd string
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskScheduler = (*BalanceScheduler)(nil)
-)
-
-// NewBalanceScheduler creates a new balance scheduler
-func NewBalanceScheduler() *BalanceScheduler {
- return &BalanceScheduler{
- enabled: true,
- maxConcurrent: 1, // Only run one balance at a time
- minInterval: 6 * time.Hour,
- lastScheduled: make(map[string]time.Time),
- minServerCount: 3,
- moveDuringOffHours: true,
- offHoursStart: "23:00",
- offHoursEnd: "06:00",
- }
-}
-
-// GetTaskType returns the task type
-func (s *BalanceScheduler) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// CanScheduleNow determines if a balance task can be scheduled
-func (s *BalanceScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- if !s.enabled {
- return false
- }
-
- // Count running balance tasks
- runningBalanceCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeBalance {
- runningBalanceCount++
- }
- }
-
- // Check concurrency limit
- if runningBalanceCount >= s.maxConcurrent {
- glog.V(3).Infof("⏸️ Balance task blocked: too many running (%d >= %d)", runningBalanceCount, s.maxConcurrent)
- return false
- }
-
- // Check minimum interval between balance operations
- if lastTime, exists := s.lastScheduled["balance"]; exists {
- if time.Since(lastTime) < s.minInterval {
- timeLeft := s.minInterval - time.Since(lastTime)
- glog.V(3).Infof("⏸️ Balance task blocked: too soon (wait %v)", timeLeft)
- return false
- }
- }
-
- // Check if we have available workers
- availableWorkerCount := 0
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeBalance {
- availableWorkerCount++
- break
- }
- }
- }
-
- if availableWorkerCount == 0 {
- glog.V(3).Infof("⏸️ Balance task blocked: no available workers")
- return false
- }
-
- // All checks passed - can schedule
- s.lastScheduled["balance"] = time.Now()
- glog.V(2).Infof("βœ… Balance task can be scheduled (running: %d/%d, workers: %d)",
- runningBalanceCount, s.maxConcurrent, availableWorkerCount)
- return true
-}
-
-// GetPriority returns the priority for balance tasks
-func (s *BalanceScheduler) GetPriority(task *types.Task) types.TaskPriority {
- // Balance is typically normal priority - not urgent but important for optimization
- return types.TaskPriorityNormal
-}
-
-// GetMaxConcurrent returns the maximum concurrent balance tasks
-func (s *BalanceScheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating balance tasks
-func (s *BalanceScheduler) GetDefaultRepeatInterval() time.Duration {
- return s.minInterval
-}
-
-// IsEnabled returns whether the scheduler is enabled
-func (s *BalanceScheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// SetEnabled sets whether the scheduler is enabled
-func (s *BalanceScheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
- glog.V(1).Infof("πŸ”„ Balance scheduler enabled: %v", enabled)
-}
-
-// SetMaxConcurrent sets the maximum concurrent balance tasks
-func (s *BalanceScheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
- glog.V(1).Infof("πŸ”„ Balance max concurrent set to: %d", max)
-}
-
-// SetMinInterval sets the minimum interval between balance operations
-func (s *BalanceScheduler) SetMinInterval(interval time.Duration) {
- s.minInterval = interval
- glog.V(1).Infof("πŸ”„ Balance minimum interval set to: %v", interval)
-}
-
-// GetLastScheduled returns when we last scheduled this task type
-func (s *BalanceScheduler) GetLastScheduled(taskKey string) time.Time {
- if lastTime, exists := s.lastScheduled[taskKey]; exists {
- return lastTime
- }
- return time.Time{}
-}
-
-// SetLastScheduled updates when we last scheduled this task type
-func (s *BalanceScheduler) SetLastScheduled(taskKey string, when time.Time) {
- s.lastScheduled[taskKey] = when
-}
-
-// GetMinServerCount returns the minimum server count
-func (s *BalanceScheduler) GetMinServerCount() int {
- return s.minServerCount
-}
-
-// SetMinServerCount sets the minimum server count
-func (s *BalanceScheduler) SetMinServerCount(count int) {
- s.minServerCount = count
- glog.V(1).Infof("πŸ”„ Balance minimum server count set to: %d", count)
-}
-
-// GetMoveDuringOffHours returns whether to move only during off-hours
-func (s *BalanceScheduler) GetMoveDuringOffHours() bool {
- return s.moveDuringOffHours
-}
-
-// SetMoveDuringOffHours sets whether to move only during off-hours
-func (s *BalanceScheduler) SetMoveDuringOffHours(enabled bool) {
- s.moveDuringOffHours = enabled
- glog.V(1).Infof("πŸ”„ Balance move during off-hours: %v", enabled)
-}
-
-// GetOffHoursStart returns the off-hours start time
-func (s *BalanceScheduler) GetOffHoursStart() string {
- return s.offHoursStart
-}
-
-// SetOffHoursStart sets the off-hours start time
-func (s *BalanceScheduler) SetOffHoursStart(start string) {
- s.offHoursStart = start
- glog.V(1).Infof("πŸ”„ Balance off-hours start time set to: %s", start)
-}
-
-// GetOffHoursEnd returns the off-hours end time
-func (s *BalanceScheduler) GetOffHoursEnd() string {
- return s.offHoursEnd
-}
-
-// SetOffHoursEnd sets the off-hours end time
-func (s *BalanceScheduler) SetOffHoursEnd(end string) {
- s.offHoursEnd = end
- glog.V(1).Infof("πŸ”„ Balance off-hours end time set to: %s", end)
-}
-
-// GetMinInterval returns the minimum interval
-func (s *BalanceScheduler) GetMinInterval() time.Duration {
- return s.minInterval
-}
diff --git a/weed/worker/tasks/balance/balance_v2.go b/weed/worker/tasks/balance/balance_v2.go
new file mode 100644
index 000000000..cb66d26d8
--- /dev/null
+++ b/weed/worker/tasks/balance/balance_v2.go
@@ -0,0 +1,274 @@
+package balance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// BalanceConfigV2 extends BaseConfig with balance-specific settings
+type BalanceConfigV2 struct {
+ base.BaseConfig
+ ImbalanceThreshold float64 `json:"imbalance_threshold"`
+ MinServerCount int `json:"min_server_count"`
+}
+
+// ToMap converts config to map (extend base functionality)
+func (c *BalanceConfigV2) ToMap() map[string]interface{} {
+ result := c.BaseConfig.ToMap()
+ result["imbalance_threshold"] = c.ImbalanceThreshold
+ result["min_server_count"] = c.MinServerCount
+ return result
+}
+
+// FromMap loads config from map (extend base functionality)
+func (c *BalanceConfigV2) FromMap(data map[string]interface{}) error {
+ // Load base config first
+ if err := c.BaseConfig.FromMap(data); err != nil {
+ return err
+ }
+
+ // Load balance-specific config
+ if threshold, ok := data["imbalance_threshold"].(float64); ok {
+ c.ImbalanceThreshold = threshold
+ }
+ if serverCount, ok := data["min_server_count"].(int); ok {
+ c.MinServerCount = serverCount
+ }
+ return nil
+}
+
+// balanceDetection implements the detection logic for balance tasks
+func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ balanceConfig := config.(*BalanceConfigV2)
+
+ // Skip if cluster is too small
+ minVolumeCount := 10
+ if len(metrics) < minVolumeCount {
+ return nil, nil
+ }
+
+ // Analyze volume distribution across servers
+ serverVolumeCounts := make(map[string]int)
+ for _, metric := range metrics {
+ serverVolumeCounts[metric.Server]++
+ }
+
+ if len(serverVolumeCounts) < balanceConfig.MinServerCount {
+ return nil, nil
+ }
+
+ // Calculate balance metrics
+ totalVolumes := len(metrics)
+ avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
+
+ maxVolumes := 0
+ minVolumes := totalVolumes
+ maxServer := ""
+ minServer := ""
+
+ for server, count := range serverVolumeCounts {
+ if count > maxVolumes {
+ maxVolumes = count
+ maxServer = server
+ }
+ if count < minVolumes {
+ minVolumes = count
+ minServer = server
+ }
+ }
+
+ // Check if imbalance exceeds threshold
+ imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
+ if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
+ return nil, nil
+ }
+
+ // Create balance task
+ reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
+ imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+
+ task := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeBalance,
+ Priority: types.TaskPriorityNormal,
+ Reason: reason,
+ ScheduleAt: time.Now(),
+ Parameters: map[string]interface{}{
+ "imbalance_ratio": imbalanceRatio,
+ "threshold": balanceConfig.ImbalanceThreshold,
+ "max_volumes": maxVolumes,
+ "min_volumes": minVolumes,
+ "avg_volumes_per_server": avgVolumesPerServer,
+ "max_server": maxServer,
+ "min_server": minServer,
+ "total_servers": len(serverVolumeCounts),
+ },
+ }
+
+ return []*types.TaskDetectionResult{task}, nil
+}
+
+// balanceScheduling implements the scheduling logic for balance tasks
+func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ balanceConfig := config.(*BalanceConfigV2)
+
+ // Count running balance tasks
+ runningBalanceCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeBalance {
+ runningBalanceCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningBalanceCount >= balanceConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ availableWorkerCount := 0
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeBalance {
+ availableWorkerCount++
+ break
+ }
+ }
+ }
+
+ return availableWorkerCount > 0
+}
+
+// createBalanceTask creates a balance task instance
+func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Validate parameters
+ if params.VolumeID == 0 {
+ return nil, fmt.Errorf("volume_id is required")
+ }
+ if params.Server == "" {
+ return nil, fmt.Errorf("server is required")
+ }
+
+ task := NewTask(params.Server, params.VolumeID, params.Collection)
+ task.SetEstimatedDuration(task.EstimateTime(params))
+ return task, nil
+}
+
+// getBalanceConfigSpec returns the configuration schema for balance tasks
+func getBalanceConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Balance Tasks",
+ Description: "Whether balance tasks should be automatically created",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "imbalance_threshold",
+ JSONName: "imbalance_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.1, // 10%
+ MinValue: 0.01,
+ MaxValue: 0.5,
+ Required: true,
+ DisplayName: "Imbalance Threshold",
+ Description: "Trigger balance when storage imbalance exceeds this ratio",
+ Placeholder: "0.10 (10%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 6 * 60 * 60, // 6 hours
+ MinValue: 1 * 60 * 60, // 1 hour
+ MaxValue: 24 * 60 * 60, // 24 hours
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for imbalanced volumes",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 1,
+ MaxValue: 5,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of balance tasks that can run simultaneously",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_server_count",
+ JSONName: "min_server_count",
+ Type: config.FieldTypeInt,
+ DefaultValue: 3,
+ MinValue: 2,
+ MaxValue: 20,
+ Required: true,
+ DisplayName: "Minimum Server Count",
+ Description: "Only balance when at least this many servers are available",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// initBalanceV2 registers the refactored balance task
+func initBalanceV2() {
+ // Create configuration instance
+ config := &BalanceConfigV2{
+ BaseConfig: base.BaseConfig{
+ Enabled: false, // Conservative default
+ ScanIntervalSeconds: 6 * 60 * 60, // 6 hours
+ MaxConcurrent: 2,
+ },
+ ImbalanceThreshold: 0.1, // 10%
+ MinServerCount: 3,
+ }
+
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeBalance,
+ Name: "balance",
+ DisplayName: "Volume Balance",
+ Description: "Redistributes volumes across volume servers to optimize storage utilization",
+ Icon: "fas fa-balance-scale text-secondary",
+ Capabilities: []string{"balance", "storage", "optimization"},
+
+ Config: config,
+ ConfigSpec: getBalanceConfigSpec(),
+ CreateTask: createBalanceTask,
+ DetectionFunc: balanceDetection,
+ ScanInterval: 6 * time.Hour,
+ SchedulingFunc: balanceScheduling,
+ MaxConcurrent: 2,
+ RepeatInterval: 12 * time.Hour,
+ }
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
+}
diff --git a/weed/worker/tasks/balance/config_schema.go b/weed/worker/tasks/balance/config_schema.go
deleted file mode 100644
index 91c5714d7..000000000
--- a/weed/worker/tasks/balance/config_schema.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package balance
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/admin/config"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
-)
-
-// GetConfigSchema returns the schema for balance task configuration
-func GetConfigSchema() *tasks.TaskConfigSchema {
- return &tasks.TaskConfigSchema{
- TaskName: "balance",
- DisplayName: "Volume Balance",
- Description: "Redistributes volumes across volume servers to optimize storage utilization",
- Icon: "fas fa-balance-scale text-secondary",
- Schema: config.Schema{
- Fields: []*config.Field{
- {
- Name: "enabled",
- JSONName: "enabled",
- Type: config.FieldTypeBool,
- DefaultValue: true,
- Required: false,
- DisplayName: "Enable Balance Tasks",
- Description: "Whether balance tasks should be automatically created",
- InputType: "checkbox",
- CSSClasses: "form-check-input",
- },
- {
- Name: "imbalance_threshold",
- JSONName: "imbalance_threshold",
- Type: config.FieldTypeFloat,
- DefaultValue: 0.1, // 10%
- MinValue: 0.01,
- MaxValue: 0.5,
- Required: true,
- DisplayName: "Imbalance Threshold",
- Description: "Trigger balance when storage imbalance exceeds this ratio",
- Placeholder: "0.10 (10%)",
- Unit: config.UnitNone,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "scan_interval_seconds",
- JSONName: "scan_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 6 * 60 * 60, // 6 hours
- MinValue: 1 * 60 * 60, // 1 hour
- MaxValue: 24 * 60 * 60, // 24 hours
- Required: true,
- DisplayName: "Scan Interval",
- Description: "How often to scan for imbalanced volumes",
- Unit: config.UnitHours,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "max_concurrent",
- JSONName: "max_concurrent",
- Type: config.FieldTypeInt,
- DefaultValue: 2,
- MinValue: 1,
- MaxValue: 5,
- Required: true,
- DisplayName: "Max Concurrent Tasks",
- Description: "Maximum number of balance tasks that can run simultaneously",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "min_server_count",
- JSONName: "min_server_count",
- Type: config.FieldTypeInt,
- DefaultValue: 3,
- MinValue: 2,
- MaxValue: 20,
- Required: true,
- DisplayName: "Minimum Server Count",
- Description: "Only balance when at least this many servers are available",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- },
- },
- }
-}
diff --git a/weed/worker/tasks/balance/schema_provider.go b/weed/worker/tasks/balance/schema_provider.go
deleted file mode 100644
index a208148f0..000000000
--- a/weed/worker/tasks/balance/schema_provider.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package balance
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
-)
-
-// SchemaProvider implements the TaskConfigSchemaProvider interface for balance tasks
-type SchemaProvider struct{}
-
-// GetConfigSchema returns the schema for balance task configuration
-func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
- return GetConfigSchema()
-}
-
-// init registers the balance schema provider
-func init() {
- tasks.RegisterTaskConfigSchema("balance", &SchemaProvider{})
-}
diff --git a/weed/worker/tasks/balance/ui.go b/weed/worker/tasks/balance/ui.go
deleted file mode 100644
index 6e34b68a2..000000000
--- a/weed/worker/tasks/balance/ui.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package balance
-
-import (
- "fmt"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// BalanceConfig represents the balance configuration matching the schema
-type BalanceConfig struct {
- Enabled bool `json:"enabled"`
- ImbalanceThreshold float64 `json:"imbalance_threshold"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- MinServerCount int `json:"min_server_count"`
-}
-
-// BalanceUILogic contains the business logic for balance UI operations
-type BalanceUILogic struct {
- detector *BalanceDetector
- scheduler *BalanceScheduler
-}
-
-// NewBalanceUILogic creates new balance UI logic
-func NewBalanceUILogic(detector *BalanceDetector, scheduler *BalanceScheduler) *BalanceUILogic {
- return &BalanceUILogic{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetCurrentConfig returns the current balance configuration
-func (logic *BalanceUILogic) GetCurrentConfig() interface{} {
- config := &BalanceConfig{
- // Default values from schema (matching task_config_schema.go)
- Enabled: true,
- ImbalanceThreshold: 0.1, // 10%
- ScanIntervalSeconds: 6 * 60 * 60, // 6 hours
- MaxConcurrent: 2,
- MinServerCount: 3,
- }
-
- // Get current values from detector
- if logic.detector != nil {
- config.Enabled = logic.detector.IsEnabled()
- config.ImbalanceThreshold = logic.detector.GetThreshold()
- config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds())
- }
-
- // Get current values from scheduler
- if logic.scheduler != nil {
- config.MaxConcurrent = logic.scheduler.GetMaxConcurrent()
- config.MinServerCount = logic.scheduler.GetMinServerCount()
- }
-
- return config
-}
-
-// ApplyConfig applies the balance configuration
-func (logic *BalanceUILogic) ApplyConfig(config interface{}) error {
- balanceConfig, ok := config.(*BalanceConfig)
- if !ok {
- return fmt.Errorf("invalid configuration type for balance")
- }
-
- // Apply to detector
- if logic.detector != nil {
- logic.detector.SetEnabled(balanceConfig.Enabled)
- logic.detector.SetThreshold(balanceConfig.ImbalanceThreshold)
- logic.detector.SetMinCheckInterval(time.Duration(balanceConfig.ScanIntervalSeconds) * time.Second)
- }
-
- // Apply to scheduler
- if logic.scheduler != nil {
- logic.scheduler.SetEnabled(balanceConfig.Enabled)
- logic.scheduler.SetMaxConcurrent(balanceConfig.MaxConcurrent)
- logic.scheduler.SetMinServerCount(balanceConfig.MinServerCount)
- }
-
- glog.V(1).Infof("Applied balance configuration: enabled=%v, threshold=%.1f%%, max_concurrent=%d, min_servers=%d",
- balanceConfig.Enabled, balanceConfig.ImbalanceThreshold*100, balanceConfig.MaxConcurrent,
- balanceConfig.MinServerCount)
-
- return nil
-}
-
-// RegisterUI registers the balance UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *BalanceDetector, scheduler *BalanceScheduler) {
- logic := NewBalanceUILogic(detector, scheduler)
-
- tasks.CommonRegisterUI(
- types.TaskTypeBalance,
- "Volume Balance",
- uiRegistry,
- detector,
- scheduler,
- GetConfigSchema,
- logic.GetCurrentConfig,
- logic.ApplyConfig,
- )
-}
-
-// DefaultBalanceConfig returns default balance configuration
-func DefaultBalanceConfig() *BalanceConfig {
- return &BalanceConfig{
- Enabled: false,
- ImbalanceThreshold: 0.1, // 10%
- ScanIntervalSeconds: 6 * 60 * 60, // 6 hours
- MaxConcurrent: 2,
- MinServerCount: 3,
- }
-}
diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go
new file mode 100644
index 000000000..27ad1bb29
--- /dev/null
+++ b/weed/worker/tasks/base/generic_components.go
@@ -0,0 +1,129 @@
+package base
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericDetector implements TaskDetector using function-based logic
+type GenericDetector struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericDetector creates a detector from a task definition
+func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
+ return &GenericDetector{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (d *GenericDetector) GetTaskType() types.TaskType {
+ return d.taskDef.Type
+}
+
+// ScanForTasks scans using the task definition's detection function
+func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
+ if d.taskDef.DetectionFunc == nil {
+ return nil, nil
+ }
+ return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
+}
+
+// ScanInterval returns the scan interval from task definition
+func (d *GenericDetector) ScanInterval() time.Duration {
+ if d.taskDef.ScanInterval > 0 {
+ return d.taskDef.ScanInterval
+ }
+ return 30 * time.Minute // Default
+}
+
+// IsEnabled returns whether this detector is enabled
+func (d *GenericDetector) IsEnabled() bool {
+ return d.taskDef.Config.IsEnabled()
+}
+
+// GenericScheduler implements TaskScheduler using function-based logic
+type GenericScheduler struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericScheduler creates a scheduler from a task definition
+func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
+ return &GenericScheduler{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (s *GenericScheduler) GetTaskType() types.TaskType {
+ return s.taskDef.Type
+}
+
+// CanScheduleNow determines if a task can be scheduled using the task definition's function
+func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if s.taskDef.SchedulingFunc == nil {
+ return s.defaultCanSchedule(task, runningTasks, availableWorkers)
+ }
+ return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
+}
+
+// defaultCanSchedule provides default scheduling logic
+func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if !s.taskDef.Config.IsEnabled() {
+ return false
+ }
+
+ // Count running tasks of this type
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == s.taskDef.Type {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ maxConcurrent := s.taskDef.MaxConcurrent
+ if maxConcurrent <= 0 {
+ maxConcurrent = 1 // Default
+ }
+ if runningCount >= maxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == s.taskDef.Type {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// GetPriority returns the priority for this task
+func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority {
+ return task.Priority
+}
+
+// GetMaxConcurrent returns max concurrent tasks
+func (s *GenericScheduler) GetMaxConcurrent() int {
+ if s.taskDef.MaxConcurrent > 0 {
+ return s.taskDef.MaxConcurrent
+ }
+ return 1 // Default
+}
+
+// GetDefaultRepeatInterval returns the default repeat interval
+func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
+ if s.taskDef.RepeatInterval > 0 {
+ return s.taskDef.RepeatInterval
+ }
+ return 24 * time.Hour // Default
+}
+
+// IsEnabled returns whether this scheduler is enabled
+func (s *GenericScheduler) IsEnabled() bool {
+ return s.taskDef.Config.IsEnabled()
+}
diff --git a/weed/worker/tasks/base/registration.go b/weed/worker/tasks/base/registration.go
new file mode 100644
index 000000000..6a91feac1
--- /dev/null
+++ b/weed/worker/tasks/base/registration.go
@@ -0,0 +1,131 @@
+package base
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericFactory creates task instances using a TaskDefinition
+type GenericFactory struct {
+ *tasks.BaseTaskFactory
+ taskDef *TaskDefinition
+}
+
+// NewGenericFactory creates a generic task factory
+func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory {
+ return &GenericFactory{
+ BaseTaskFactory: tasks.NewBaseTaskFactory(
+ taskDef.Type,
+ taskDef.Capabilities,
+ taskDef.Description,
+ ),
+ taskDef: taskDef,
+ }
+}
+
+// Create creates a task instance using the task definition
+func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) {
+ if f.taskDef.CreateTask == nil {
+ return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type)
+ }
+ return f.taskDef.CreateTask(params)
+}
+
+// GenericSchemaProvider provides config schema from TaskDefinition
+type GenericSchemaProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetConfigSchema returns the schema from task definition
+func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
+ return &tasks.TaskConfigSchema{
+ TaskName: string(p.taskDef.Type),
+ DisplayName: p.taskDef.DisplayName,
+ Description: p.taskDef.Description,
+ Icon: p.taskDef.Icon,
+ Schema: config.Schema{
+ Fields: p.taskDef.ConfigSpec.Fields,
+ },
+ }
+}
+
+// GenericUIProvider provides UI functionality from TaskDefinition
+type GenericUIProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetCurrentConfig returns current config as interface{}
+func (ui *GenericUIProvider) GetCurrentConfig() interface{} {
+ return ui.taskDef.Config
+}
+
+// ApplyConfig applies configuration
+func (ui *GenericUIProvider) ApplyConfig(config interface{}) error {
+ if configMap, ok := config.(map[string]interface{}); ok {
+ return ui.taskDef.Config.FromMap(configMap)
+ }
+ return fmt.Errorf("invalid config format for %s", ui.taskDef.Type)
+}
+
+// RegisterTask registers a complete task definition with all registries
+func RegisterTask(taskDef *TaskDefinition) {
+ // Validate task definition
+ if err := validateTaskDefinition(taskDef); err != nil {
+ glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err)
+ return
+ }
+
+ // Create and register factory
+ factory := NewGenericFactory(taskDef)
+ tasks.AutoRegister(taskDef.Type, factory)
+
+ // Create and register detector/scheduler
+ detector := NewGenericDetector(taskDef)
+ scheduler := NewGenericScheduler(taskDef)
+
+ tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
+ registry.RegisterTask(detector, scheduler)
+ })
+
+ // Create and register schema provider
+ schemaProvider := &GenericSchemaProvider{taskDef: taskDef}
+ tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider)
+
+ // Create and register UI provider
+ uiProvider := &GenericUIProvider{taskDef: taskDef}
+ tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
+ baseUIProvider := tasks.NewBaseUIProvider(
+ taskDef.Type,
+ taskDef.DisplayName,
+ taskDef.Description,
+ taskDef.Icon,
+ schemaProvider.GetConfigSchema,
+ uiProvider.GetCurrentConfig,
+ uiProvider.ApplyConfig,
+ )
+ uiRegistry.RegisterUI(baseUIProvider)
+ })
+
+ glog.V(1).Infof("βœ… Registered complete task definition: %s", taskDef.Type)
+}
+
+// validateTaskDefinition ensures the task definition is complete
+func validateTaskDefinition(taskDef *TaskDefinition) error {
+ if taskDef.Type == "" {
+ return fmt.Errorf("task type is required")
+ }
+ if taskDef.Name == "" {
+ return fmt.Errorf("task name is required")
+ }
+ if taskDef.Config == nil {
+ return fmt.Errorf("task config is required")
+ }
+ if taskDef.CreateTask == nil {
+ return fmt.Errorf("task creation function is required")
+ }
+ return nil
+}
diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go
new file mode 100644
index 000000000..c32471664
--- /dev/null
+++ b/weed/worker/tasks/base/task_definition.go
@@ -0,0 +1,95 @@
+package base
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TaskDefinition encapsulates everything needed to define a complete task type
+type TaskDefinition struct {
+ // Basic task information
+ Type types.TaskType
+ Name string
+ DisplayName string
+ Description string
+ Icon string
+ Capabilities []string
+
+ // Task configuration
+ Config TaskConfig
+ ConfigSpec ConfigSpec
+
+ // Task creation
+ CreateTask func(params types.TaskParams) (types.TaskInterface, error)
+
+ // Detection logic
+ DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error)
+ ScanInterval time.Duration
+
+ // Scheduling logic
+ SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool
+ MaxConcurrent int
+ RepeatInterval time.Duration
+}
+
+// TaskConfig provides a simple configuration interface
+type TaskConfig interface {
+ IsEnabled() bool
+ SetEnabled(bool)
+ Validate() error
+ ToMap() map[string]interface{}
+ FromMap(map[string]interface{}) error
+}
+
+// ConfigSpec defines the configuration schema
+type ConfigSpec struct {
+ Fields []*config.Field
+}
+
+// BaseConfig provides common configuration fields
+type BaseConfig struct {
+ Enabled bool `json:"enabled"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+}
+
+// IsEnabled returns whether the task is enabled
+func (c *BaseConfig) IsEnabled() bool {
+ return c.Enabled
+}
+
+// SetEnabled sets whether the task is enabled
+func (c *BaseConfig) SetEnabled(enabled bool) {
+ c.Enabled = enabled
+}
+
+// Validate validates the base configuration
+func (c *BaseConfig) Validate() error {
+ // Common validation logic
+ return nil
+}
+
+// ToMap converts config to map
+func (c *BaseConfig) ToMap() map[string]interface{} {
+ return map[string]interface{}{
+ "enabled": c.Enabled,
+ "scan_interval_seconds": c.ScanIntervalSeconds,
+ "max_concurrent": c.MaxConcurrent,
+ }
+}
+
+// FromMap loads config from map
+func (c *BaseConfig) FromMap(data map[string]interface{}) error {
+ if enabled, ok := data["enabled"].(bool); ok {
+ c.Enabled = enabled
+ }
+ if interval, ok := data["scan_interval_seconds"].(int); ok {
+ c.ScanIntervalSeconds = interval
+ }
+ if concurrent, ok := data["max_concurrent"].(int); ok {
+ c.MaxConcurrent = concurrent
+ }
+ return nil
+}
diff --git a/weed/worker/tasks/erasure_coding/config_schema.go b/weed/worker/tasks/erasure_coding/config_schema.go
deleted file mode 100644
index 24b96137c..000000000
--- a/weed/worker/tasks/erasure_coding/config_schema.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package erasure_coding
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/admin/config"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
-)
-
-// GetConfigSchema returns the schema for erasure coding task configuration
-func GetConfigSchema() *tasks.TaskConfigSchema {
- return &tasks.TaskConfigSchema{
- TaskName: "erasure_coding",
- DisplayName: "Erasure Coding",
- Description: "Converts volumes to erasure coded format for improved data durability",
- Icon: "fas fa-shield-alt text-info",
- Schema: config.Schema{
- Fields: []*config.Field{
- {
- Name: "enabled",
- JSONName: "enabled",
- Type: config.FieldTypeBool,
- DefaultValue: true,
- Required: false,
- DisplayName: "Enable Erasure Coding Tasks",
- Description: "Whether erasure coding tasks should be automatically created",
- InputType: "checkbox",
- CSSClasses: "form-check-input",
- },
- {
- Name: "quiet_for_seconds",
- JSONName: "quiet_for_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 7 * 24 * 60 * 60, // 7 days
- MinValue: 1 * 24 * 60 * 60, // 1 day
- MaxValue: 30 * 24 * 60 * 60, // 30 days
- Required: true,
- DisplayName: "Quiet For Duration",
- Description: "Only apply erasure coding to volumes that have not been modified for this duration",
- Unit: config.UnitDays,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "scan_interval_seconds",
- JSONName: "scan_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 12 * 60 * 60, // 12 hours
- MinValue: 2 * 60 * 60, // 2 hours
- MaxValue: 24 * 60 * 60, // 24 hours
- Required: true,
- DisplayName: "Scan Interval",
- Description: "How often to scan for volumes needing erasure coding",
- Unit: config.UnitHours,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "max_concurrent",
- JSONName: "max_concurrent",
- Type: config.FieldTypeInt,
- DefaultValue: 1,
- MinValue: 1,
- MaxValue: 3,
- Required: true,
- DisplayName: "Max Concurrent Tasks",
- Description: "Maximum number of erasure coding tasks that can run simultaneously",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "fullness_ratio",
- JSONName: "fullness_ratio",
- Type: config.FieldTypeFloat,
- DefaultValue: 0.9, // 90%
- MinValue: 0.5,
- MaxValue: 1.0,
- Required: true,
- DisplayName: "Fullness Ratio",
- Description: "Only apply erasure coding to volumes with fullness ratio above this threshold",
- Placeholder: "0.90 (90%)",
- Unit: config.UnitNone,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "collection_filter",
- JSONName: "collection_filter",
- Type: config.FieldTypeString,
- DefaultValue: "",
- Required: false,
- DisplayName: "Collection Filter",
- Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)",
- Placeholder: "collection1,collection2",
- InputType: "text",
- CSSClasses: "form-control",
- },
- },
- },
- }
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go
deleted file mode 100644
index 5fd13845f..000000000
--- a/weed/worker/tasks/erasure_coding/ec_detector.go
+++ /dev/null
@@ -1,226 +0,0 @@
-package erasure_coding
-
-import (
- "fmt"
- "strings"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// EcDetector implements erasure coding task detection
-type EcDetector struct {
- enabled bool
- quietForSeconds int
- fullnessRatio float64
- minSizeMB int // Minimum volume size in MB before considering EC
- scanInterval time.Duration
- collectionFilter string
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*EcDetector)(nil)
- _ types.PolicyConfigurableDetector = (*EcDetector)(nil)
-)
-
-// NewEcDetector creates a new erasure coding detector with production defaults
-func NewEcDetector() *EcDetector {
- return &EcDetector{
- enabled: false, // Conservative default - enable via configuration
- quietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period
- fullnessRatio: 0.90, // 90% full threshold
- minSizeMB: 100, // Minimum 100MB volume size
- scanInterval: 12 * time.Hour, // Scan every 12 hours
- collectionFilter: "", // No collection filter by default
- }
-}
-
-// GetTaskType returns the task type
-func (d *EcDetector) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// ScanForTasks scans for volumes that should be converted to erasure coding
-func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- glog.V(2).Infof("EC detector is disabled")
- return nil, nil
- }
-
- var results []*types.TaskDetectionResult
- now := time.Now()
- quietThreshold := time.Duration(d.quietForSeconds) * time.Second
- minSizeBytes := uint64(d.minSizeMB) * 1024 * 1024
-
- glog.V(2).Infof("EC detector scanning %d volumes with thresholds: quietFor=%ds, fullness=%.2f, minSize=%dMB",
- len(volumeMetrics), d.quietForSeconds, d.fullnessRatio, d.minSizeMB)
-
- for _, metric := range volumeMetrics {
- // Skip if already EC volume
- if metric.IsECVolume {
- continue
- }
-
- // Check minimum size requirement
- if metric.Size < minSizeBytes {
- continue
- }
-
- // Check collection filter if specified
- if d.collectionFilter != "" {
- // Parse comma-separated collections
- allowedCollections := make(map[string]bool)
- for _, collection := range strings.Split(d.collectionFilter, ",") {
- allowedCollections[strings.TrimSpace(collection)] = true
- }
- // Skip if volume's collection is not in the allowed list
- if !allowedCollections[metric.Collection] {
- continue
- }
- }
-
- // Check quiet duration and fullness criteria
- if metric.Age >= quietThreshold && metric.FullnessRatio >= d.fullnessRatio {
- // Note: Removed read-only requirement for testing
- // In production, you might want to enable this:
- // if !metric.IsReadOnly {
- // continue
- // }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeErasureCoding,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: types.TaskPriorityLow, // EC is not urgent
- Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
- metric.Age.Seconds(), d.quietForSeconds, metric.FullnessRatio*100, d.fullnessRatio*100,
- float64(metric.Size)/(1024*1024), d.minSizeMB),
- Parameters: map[string]interface{}{
- "age_seconds": int(metric.Age.Seconds()),
- "fullness_ratio": metric.FullnessRatio,
- "size_mb": int(metric.Size / (1024 * 1024)),
- },
- ScheduleAt: now,
- }
- results = append(results, result)
-
- glog.V(1).Infof("EC task detected for volume %d on %s: %s", metric.VolumeID, metric.Server, result.Reason)
- }
- }
-
- glog.V(1).Infof("EC detector found %d tasks to schedule", len(results))
- return results, nil
-}
-
-// ScanInterval returns how often this task type should be scanned
-func (d *EcDetector) ScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// IsEnabled returns whether this task type is enabled
-func (d *EcDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// Configuration methods for runtime configuration
-
-// Configure sets detector configuration from policy
-func (d *EcDetector) Configure(config map[string]interface{}) error {
- if enabled, ok := config["enabled"].(bool); ok {
- d.enabled = enabled
- }
-
- if ageSeconds, ok := config["quiet_for_seconds"].(float64); ok {
- d.quietForSeconds = int(ageSeconds)
- }
-
- if fullnessRatio, ok := config["fullness_ratio"].(float64); ok {
- d.fullnessRatio = fullnessRatio
- }
-
- if minSizeMB, ok := config["min_size_mb"].(float64); ok {
- d.minSizeMB = int(minSizeMB)
- }
-
- if collectionFilter, ok := config["collection_filter"].(string); ok {
- d.collectionFilter = collectionFilter
- }
-
- glog.V(1).Infof("EC detector configured: enabled=%v, quietFor=%ds, fullness=%.2f, minSize=%dMB, collection_filter='%s'",
- d.enabled, d.quietForSeconds, d.fullnessRatio, d.minSizeMB, d.collectionFilter)
-
- return nil
-}
-
-// SetEnabled sets whether the detector is enabled
-func (d *EcDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
-}
-
-// SetQuietForSeconds sets the quiet duration threshold in seconds
-func (d *EcDetector) SetQuietForSeconds(seconds int) {
- d.quietForSeconds = seconds
-}
-
-// SetFullnessRatio sets the fullness ratio threshold
-func (d *EcDetector) SetFullnessRatio(ratio float64) {
- d.fullnessRatio = ratio
-}
-
-// SetCollectionFilter sets the collection filter
-func (d *EcDetector) SetCollectionFilter(filter string) {
- d.collectionFilter = filter
-}
-
-// SetScanInterval sets the scan interval
-func (d *EcDetector) SetScanInterval(interval time.Duration) {
- d.scanInterval = interval
-}
-
-// GetQuietForSeconds returns the current quiet duration threshold in seconds
-func (d *EcDetector) GetQuietForSeconds() int {
- return d.quietForSeconds
-}
-
-// GetFullnessRatio returns the current fullness ratio threshold
-func (d *EcDetector) GetFullnessRatio() float64 {
- return d.fullnessRatio
-}
-
-// GetCollectionFilter returns the current collection filter
-func (d *EcDetector) GetCollectionFilter() string {
- return d.collectionFilter
-}
-
-// GetScanInterval returns the scan interval
-func (d *EcDetector) GetScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// ConfigureFromPolicy configures the detector from maintenance policy
-func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
- // Cast policy to maintenance policy type
- if maintenancePolicy, ok := policy.(*types.MaintenancePolicy); ok {
- // Get EC-specific configuration from policy
- ecConfig := maintenancePolicy.GetTaskConfig(types.TaskTypeErasureCoding)
-
- if ecConfig != nil {
- // Convert to map for easier access
- if configMap, ok := ecConfig.(map[string]interface{}); ok {
- d.Configure(configMap)
- } else {
- glog.Warningf("EC detector policy configuration is not a map: %T", ecConfig)
- }
- } else {
- // No specific configuration found, use defaults with policy-based enabled status
- enabled := maintenancePolicy.GlobalSettings != nil && maintenancePolicy.GlobalSettings.MaintenanceEnabled
- glog.V(2).Infof("No EC-specific config found, using default with enabled=%v", enabled)
- d.enabled = enabled
- }
- } else {
- glog.Warningf("ConfigureFromPolicy received unknown policy type: %T", policy)
- }
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go
index 4fe7e47bf..f8aed927f 100644
--- a/weed/worker/tasks/erasure_coding/ec_register.go
+++ b/weed/worker/tasks/erasure_coding/ec_register.go
@@ -1,90 +1,7 @@
package erasure_coding
-import (
- "fmt"
-
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// Factory creates erasure coding task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
-
-// NewFactory creates a new erasure coding task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeErasureCoding,
- []string{"erasure_coding", "storage", "durability"},
- "Convert volumes to erasure coded format for improved durability",
- ),
- }
-}
-
-// Create creates a new erasure coding task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- // Extract additional parameters for comprehensive EC
- masterClient := "localhost:9333" // Default master client
- workDir := "/tmp/seaweedfs_ec_work" // Default work directory
-
- if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" {
- masterClient = mc
- }
- if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" {
- workDir = wd
- }
-
- // Create EC task with comprehensive capabilities
- task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir)
-
- // Set gRPC dial option if provided
- if params.GrpcDialOption != nil {
- task.SetDialOption(params.GrpcDialOption)
- }
-
- task.SetEstimatedDuration(task.EstimateTime(params))
-
- return task, nil
-}
-
-// getSharedInstances returns shared detector and scheduler instances
-func getSharedInstances() (*EcDetector, *Scheduler) {
- // Create shared instances (singleton pattern)
- detector := NewEcDetector()
- scheduler := NewScheduler()
- return detector, scheduler
-}
-
-// GetSharedInstances returns the shared detector and scheduler instances (public API)
-func GetSharedInstances() (*EcDetector, *Scheduler) {
- return getSharedInstances()
-}
-
// Auto-register this task when the package is imported
func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeErasureCoding, factory)
-
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
-
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
-
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ // Use new architecture instead of old registration
+ initErasureCodingV2()
}
diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go
deleted file mode 100644
index d74c4c6ae..000000000
--- a/weed/worker/tasks/erasure_coding/ec_scheduler.go
+++ /dev/null
@@ -1,95 +0,0 @@
-package erasure_coding
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// Scheduler implements erasure coding task scheduling
-type Scheduler struct {
- maxConcurrent int
- enabled bool
-}
-
-// NewScheduler creates a new erasure coding scheduler
-func NewScheduler() *Scheduler {
- return &Scheduler{
- maxConcurrent: 1, // Conservative default
- enabled: false, // Conservative default
- }
-}
-
-// GetTaskType returns the task type
-func (s *Scheduler) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// CanScheduleNow determines if an erasure coding task can be scheduled now
-func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- if !s.enabled {
- return false
- }
-
- // Check if we have available workers
- if len(availableWorkers) == 0 {
- return false
- }
-
- // Count running EC tasks
- runningCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeErasureCoding {
- runningCount++
- }
- }
-
- // Check concurrency limit
- if runningCount >= s.maxConcurrent {
- glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
- return false
- }
-
- // Check if any worker can handle EC tasks
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeErasureCoding {
- glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
- return true
- }
- }
- }
-
- return false
-}
-
-// GetMaxConcurrent returns the maximum number of concurrent tasks
-func (s *Scheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
-func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
- return 24 * time.Hour // Don't repeat EC for 24 hours
-}
-
-// GetPriority returns the priority for this task
-func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
- return types.TaskPriorityLow // EC is not urgent
-}
-
-// IsEnabled returns whether this task type is enabled
-func (s *Scheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// Configuration setters
-
-func (s *Scheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
-}
-
-func (s *Scheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_v2.go b/weed/worker/tasks/erasure_coding/ec_v2.go
new file mode 100644
index 000000000..ca2b28768
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ec_v2.go
@@ -0,0 +1,301 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// ErasureCodingConfigV2 extends BaseConfig with erasure coding specific settings
+type ErasureCodingConfigV2 struct {
+ base.BaseConfig
+ QuietForSeconds int `json:"quiet_for_seconds"`
+ FullnessRatio float64 `json:"fullness_ratio"`
+ CollectionFilter string `json:"collection_filter"`
+}
+
+// ToMap converts config to map (extend base functionality)
+func (c *ErasureCodingConfigV2) ToMap() map[string]interface{} {
+ result := c.BaseConfig.ToMap()
+ result["quiet_for_seconds"] = c.QuietForSeconds
+ result["fullness_ratio"] = c.FullnessRatio
+ result["collection_filter"] = c.CollectionFilter
+ return result
+}
+
+// FromMap loads config from map (extend base functionality)
+func (c *ErasureCodingConfigV2) FromMap(data map[string]interface{}) error {
+ // Load base config first
+ if err := c.BaseConfig.FromMap(data); err != nil {
+ return err
+ }
+
+ // Load erasure coding specific config
+ if quietFor, ok := data["quiet_for_seconds"].(int); ok {
+ c.QuietForSeconds = quietFor
+ }
+ if fullness, ok := data["fullness_ratio"].(float64); ok {
+ c.FullnessRatio = fullness
+ }
+ if filter, ok := data["collection_filter"].(string); ok {
+ c.CollectionFilter = filter
+ }
+ return nil
+}
+
+// ecDetection implements the detection logic for erasure coding tasks
+func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ ecConfig := config.(*ErasureCodingConfigV2)
+ var results []*types.TaskDetectionResult
+ now := time.Now()
+ quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
+ minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum
+
+ for _, metric := range metrics {
+ // Skip if already EC volume
+ if metric.IsECVolume {
+ continue
+ }
+
+ // Check minimum size requirement
+ if metric.Size < minSizeBytes {
+ continue
+ }
+
+ // Check collection filter if specified
+ if ecConfig.CollectionFilter != "" {
+ // Parse comma-separated collections
+ allowedCollections := make(map[string]bool)
+ for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
+ allowedCollections[strings.TrimSpace(collection)] = true
+ }
+ // Skip if volume's collection is not in the allowed list
+ if !allowedCollections[metric.Collection] {
+ continue
+ }
+ }
+
+ // Check quiet duration and fullness criteria
+ if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: types.TaskPriorityLow, // EC is not urgent
+ Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
+ metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
+ float64(metric.Size)/(1024*1024)),
+ Parameters: map[string]interface{}{
+ "age_seconds": int(metric.Age.Seconds()),
+ "fullness_ratio": metric.FullnessRatio,
+ "size_mb": int(metric.Size / (1024 * 1024)),
+ },
+ ScheduleAt: now,
+ }
+ results = append(results, result)
+ }
+ }
+
+ return results, nil
+}
+
+// ecScheduling implements the scheduling logic for erasure coding tasks
+func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ ecConfig := config.(*ErasureCodingConfigV2)
+
+ // Check if we have available workers
+ if len(availableWorkers) == 0 {
+ return false
+ }
+
+ // Count running EC tasks
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeErasureCoding {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningCount >= ecConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if any worker can handle EC tasks
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeErasureCoding {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// createErasureCodingTask creates an erasure coding task instance
+func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Validate parameters
+ if params.VolumeID == 0 {
+ return nil, fmt.Errorf("volume_id is required")
+ }
+ if params.Server == "" {
+ return nil, fmt.Errorf("server is required")
+ }
+
+ // Extract additional parameters for comprehensive EC
+ masterClient := "localhost:9333" // Default master client
+ workDir := "/tmp/seaweedfs_ec_work" // Default work directory
+
+ if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" {
+ masterClient = mc
+ }
+ if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" {
+ workDir = wd
+ }
+
+ // Create EC task with comprehensive capabilities
+ task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir)
+
+ // Set gRPC dial option if provided
+ if params.GrpcDialOption != nil {
+ task.SetDialOption(params.GrpcDialOption)
+ }
+
+ task.SetEstimatedDuration(task.EstimateTime(params))
+ return task, nil
+}
+
+// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks
+func getErasureCodingConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Erasure Coding Tasks",
+ Description: "Whether erasure coding tasks should be automatically created",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "quiet_for_seconds",
+ JSONName: "quiet_for_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 7 * 24 * 60 * 60, // 7 days
+ MinValue: 1 * 24 * 60 * 60, // 1 day
+ MaxValue: 30 * 24 * 60 * 60, // 30 days
+ Required: true,
+ DisplayName: "Quiet For Duration",
+ Description: "Only apply erasure coding to volumes that have not been modified for this duration",
+ Unit: config.UnitDays,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 12 * 60 * 60, // 12 hours
+ MinValue: 2 * 60 * 60, // 2 hours
+ MaxValue: 24 * 60 * 60, // 24 hours
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing erasure coding",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 3,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of erasure coding tasks that can run simultaneously",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "fullness_ratio",
+ JSONName: "fullness_ratio",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.9, // 90%
+ MinValue: 0.5,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Fullness Ratio",
+ Description: "Only apply erasure coding to volumes with fullness ratio above this threshold",
+ Placeholder: "0.90 (90%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "collection_filter",
+ JSONName: "collection_filter",
+ Type: config.FieldTypeString,
+ DefaultValue: "",
+ Required: false,
+ DisplayName: "Collection Filter",
+ Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)",
+ Placeholder: "collection1,collection2",
+ InputType: "text",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// initErasureCodingV2 registers the refactored erasure coding task
+func initErasureCodingV2() {
+ // Create configuration instance
+ config := &ErasureCodingConfigV2{
+ BaseConfig: base.BaseConfig{
+ Enabled: false, // Conservative default - enable via configuration
+ ScanIntervalSeconds: 12 * 60 * 60, // 12 hours
+ MaxConcurrent: 1, // Conservative default
+ },
+ QuietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period
+ FullnessRatio: 0.90, // 90% full threshold
+ CollectionFilter: "", // No collection filter by default
+ }
+
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeErasureCoding,
+ Name: "erasure_coding",
+ DisplayName: "Erasure Coding",
+ Description: "Converts volumes to erasure coded format for improved data durability",
+ Icon: "fas fa-shield-alt text-info",
+ Capabilities: []string{"erasure_coding", "storage", "durability"},
+
+ Config: config,
+ ConfigSpec: getErasureCodingConfigSpec(),
+ CreateTask: createErasureCodingTask,
+ DetectionFunc: ecDetection,
+ ScanInterval: 12 * time.Hour,
+ SchedulingFunc: ecScheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 24 * time.Hour,
+ }
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
+}
diff --git a/weed/worker/tasks/erasure_coding/schema_provider.go b/weed/worker/tasks/erasure_coding/schema_provider.go
deleted file mode 100644
index 0940b6c85..000000000
--- a/weed/worker/tasks/erasure_coding/schema_provider.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package erasure_coding
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
-)
-
-// SchemaProvider implements the TaskConfigSchemaProvider interface for erasure coding tasks
-type SchemaProvider struct{}
-
-// GetConfigSchema returns the schema for erasure coding task configuration
-func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
- return GetConfigSchema()
-}
-
-// init registers the erasure coding schema provider
-func init() {
- tasks.RegisterTaskConfigSchema("erasure_coding", &SchemaProvider{})
-}
diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go
deleted file mode 100644
index 2ab338722..000000000
--- a/weed/worker/tasks/erasure_coding/ui.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package erasure_coding
-
-import (
- "fmt"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// ErasureCodingConfig represents the erasure coding configuration matching the schema
-type ErasureCodingConfig struct {
- Enabled bool `json:"enabled"`
- QuietForSeconds int `json:"quiet_for_seconds"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- FullnessRatio float64 `json:"fullness_ratio"`
- CollectionFilter string `json:"collection_filter"`
-}
-
-// ErasureCodingUILogic contains the business logic for erasure coding UI operations
-type ErasureCodingUILogic struct {
- detector *EcDetector
- scheduler *Scheduler
-}
-
-// NewErasureCodingUILogic creates new erasure coding UI logic
-func NewErasureCodingUILogic(detector *EcDetector, scheduler *Scheduler) *ErasureCodingUILogic {
- return &ErasureCodingUILogic{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetCurrentConfig returns the current erasure coding configuration
-func (logic *ErasureCodingUILogic) GetCurrentConfig() interface{} {
- config := ErasureCodingConfig{
- // Default values from schema (matching task_config_schema.go)
- Enabled: true,
- QuietForSeconds: 7 * 24 * 60 * 60, // 7 days
- ScanIntervalSeconds: 12 * 60 * 60, // 12 hours
- MaxConcurrent: 1,
- FullnessRatio: 0.9, // 90%
- CollectionFilter: "",
- }
-
- // Get current values from detector
- if logic.detector != nil {
- config.Enabled = logic.detector.IsEnabled()
- config.QuietForSeconds = logic.detector.GetQuietForSeconds()
- config.FullnessRatio = logic.detector.GetFullnessRatio()
- config.CollectionFilter = logic.detector.GetCollectionFilter()
- config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds())
- }
-
- // Get current values from scheduler
- if logic.scheduler != nil {
- config.MaxConcurrent = logic.scheduler.GetMaxConcurrent()
- }
-
- return config
-}
-
-// ApplyConfig applies the erasure coding configuration
-func (logic *ErasureCodingUILogic) ApplyConfig(config interface{}) error {
- ecConfig, ok := config.(ErasureCodingConfig)
- if !ok {
- return fmt.Errorf("invalid configuration type for erasure coding")
- }
-
- // Apply to detector
- if logic.detector != nil {
- logic.detector.SetEnabled(ecConfig.Enabled)
- logic.detector.SetQuietForSeconds(ecConfig.QuietForSeconds)
- logic.detector.SetFullnessRatio(ecConfig.FullnessRatio)
- logic.detector.SetCollectionFilter(ecConfig.CollectionFilter)
- logic.detector.SetScanInterval(time.Duration(ecConfig.ScanIntervalSeconds) * time.Second)
- }
-
- // Apply to scheduler
- if logic.scheduler != nil {
- logic.scheduler.SetEnabled(ecConfig.Enabled)
- logic.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent)
- }
-
- glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, quiet_for=%v seconds, max_concurrent=%d, fullness_ratio=%f, collection_filter=%s",
- ecConfig.Enabled, ecConfig.QuietForSeconds, ecConfig.MaxConcurrent, ecConfig.FullnessRatio, ecConfig.CollectionFilter)
-
- return nil
-}
-
-// RegisterUI registers the erasure coding UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) {
- logic := NewErasureCodingUILogic(detector, scheduler)
-
- tasks.CommonRegisterUI(
- types.TaskTypeErasureCoding,
- "Erasure Coding",
- uiRegistry,
- detector,
- scheduler,
- GetConfigSchema,
- logic.GetCurrentConfig,
- logic.ApplyConfig,
- )
-}
diff --git a/weed/worker/tasks/vacuum/config_schema.go b/weed/worker/tasks/vacuum/config_schema.go
deleted file mode 100644
index 3c063f4ee..000000000
--- a/weed/worker/tasks/vacuum/config_schema.go
+++ /dev/null
@@ -1,112 +0,0 @@
-package vacuum
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/admin/config"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
-)
-
-// GetConfigSchema returns the schema for vacuum task configuration
-func GetConfigSchema() *tasks.TaskConfigSchema {
- return &tasks.TaskConfigSchema{
- TaskName: "vacuum",
- DisplayName: "Volume Vacuum",
- Description: "Reclaims disk space by removing deleted files from volumes",
- Icon: "fas fa-broom text-primary",
- Schema: config.Schema{
- Fields: []*config.Field{
- {
- Name: "enabled",
- JSONName: "enabled",
- Type: config.FieldTypeBool,
- DefaultValue: true,
- Required: false,
- DisplayName: "Enable Vacuum Tasks",
- Description: "Whether vacuum tasks should be automatically created",
- HelpText: "Toggle this to enable or disable automatic vacuum task generation",
- InputType: "checkbox",
- CSSClasses: "form-check-input",
- },
- {
- Name: "garbage_threshold",
- JSONName: "garbage_threshold",
- Type: config.FieldTypeFloat,
- DefaultValue: 0.3, // 30%
- MinValue: 0.0,
- MaxValue: 1.0,
- Required: true,
- DisplayName: "Garbage Percentage Threshold",
- Description: "Trigger vacuum when garbage ratio exceeds this percentage",
- HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
- Placeholder: "0.30 (30%)",
- Unit: config.UnitNone,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "scan_interval_seconds",
- JSONName: "scan_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 2 * 60 * 60, // 2 hours
- MinValue: 10 * 60, // 10 minutes
- MaxValue: 24 * 60 * 60, // 24 hours
- Required: true,
- DisplayName: "Scan Interval",
- Description: "How often to scan for volumes needing vacuum",
- HelpText: "The system will check for volumes that need vacuuming at this interval",
- Placeholder: "2",
- Unit: config.UnitHours,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "max_concurrent",
- JSONName: "max_concurrent",
- Type: config.FieldTypeInt,
- DefaultValue: 2,
- MinValue: 1,
- MaxValue: 10,
- Required: true,
- DisplayName: "Max Concurrent Tasks",
- Description: "Maximum number of vacuum tasks that can run simultaneously",
- HelpText: "Limits the number of vacuum operations running at the same time to control system load",
- Placeholder: "2 (default)",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "min_volume_age_seconds",
- JSONName: "min_volume_age_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 24 * 60 * 60, // 24 hours
- MinValue: 1 * 60 * 60, // 1 hour
- MaxValue: 7 * 24 * 60 * 60, // 7 days
- Required: true,
- DisplayName: "Minimum Volume Age",
- Description: "Only vacuum volumes older than this duration",
- HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
- Placeholder: "24",
- Unit: config.UnitHours,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "min_interval_seconds",
- JSONName: "min_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 7 * 24 * 60 * 60, // 7 days
- MinValue: 1 * 24 * 60 * 60, // 1 day
- MaxValue: 30 * 24 * 60 * 60, // 30 days
- Required: true,
- DisplayName: "Minimum Interval",
- Description: "Minimum time between vacuum operations on the same volume",
- HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
- Placeholder: "7",
- Unit: config.UnitDays,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- },
- },
- }
-}
diff --git a/weed/worker/tasks/vacuum/schema_provider.go b/weed/worker/tasks/vacuum/schema_provider.go
deleted file mode 100644
index a3872b049..000000000
--- a/weed/worker/tasks/vacuum/schema_provider.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package vacuum
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
-)
-
-// SchemaProvider implements the TaskConfigSchemaProvider interface for vacuum tasks
-type SchemaProvider struct{}
-
-// GetConfigSchema returns the schema for vacuum task configuration
-func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
- return GetConfigSchema()
-}
-
-// init registers the vacuum schema provider
-func init() {
- tasks.RegisterTaskConfigSchema("vacuum", &SchemaProvider{})
-}
diff --git a/weed/worker/tasks/vacuum/ui.go b/weed/worker/tasks/vacuum/ui.go
deleted file mode 100644
index a6176ffa6..000000000
--- a/weed/worker/tasks/vacuum/ui.go
+++ /dev/null
@@ -1,112 +0,0 @@
-package vacuum
-
-import (
- "fmt"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// VacuumConfig represents the vacuum configuration matching the schema
-type VacuumConfig struct {
- Enabled bool `json:"enabled"`
- GarbageThreshold float64 `json:"garbage_threshold"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
- MinIntervalSeconds int `json:"min_interval_seconds"`
-}
-
-// VacuumUILogic contains the business logic for vacuum UI operations
-type VacuumUILogic struct {
- detector *VacuumDetector
- scheduler *VacuumScheduler
-}
-
-// NewVacuumUILogic creates new vacuum UI logic
-func NewVacuumUILogic(detector *VacuumDetector, scheduler *VacuumScheduler) *VacuumUILogic {
- return &VacuumUILogic{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetCurrentConfig returns the current vacuum configuration
-func (logic *VacuumUILogic) GetCurrentConfig() interface{} {
- config := &VacuumConfig{
- // Default values from schema (matching task_config_schema.go)
- Enabled: true,
- GarbageThreshold: 0.3, // 30%
- ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
- MaxConcurrent: 2,
- MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
- MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
- }
-
- // Get current values from detector
- if logic.detector != nil {
- config.Enabled = logic.detector.IsEnabled()
- config.GarbageThreshold = logic.detector.GetGarbageThreshold()
- config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds())
- config.MinVolumeAgeSeconds = int(logic.detector.GetMinVolumeAge().Seconds())
- }
-
- // Get current values from scheduler
- if logic.scheduler != nil {
- config.MaxConcurrent = logic.scheduler.GetMaxConcurrent()
- config.MinIntervalSeconds = int(logic.scheduler.GetMinInterval().Seconds())
- }
-
- return config
-}
-
-// ApplyConfig applies the vacuum configuration
-func (logic *VacuumUILogic) ApplyConfig(config interface{}) error {
- vacuumConfig, ok := config.(*VacuumConfig)
- if !ok {
- return fmt.Errorf("invalid configuration type for vacuum")
- }
-
- // Apply to detector
- if logic.detector != nil {
- logic.detector.SetEnabled(vacuumConfig.Enabled)
- logic.detector.SetGarbageThreshold(vacuumConfig.GarbageThreshold)
- logic.detector.SetScanInterval(time.Duration(vacuumConfig.ScanIntervalSeconds) * time.Second)
- logic.detector.SetMinVolumeAge(time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second)
- }
-
- // Apply to scheduler
- if logic.scheduler != nil {
- logic.scheduler.SetEnabled(vacuumConfig.Enabled)
- logic.scheduler.SetMaxConcurrent(vacuumConfig.MaxConcurrent)
- logic.scheduler.SetMinInterval(time.Duration(vacuumConfig.MinIntervalSeconds) * time.Second)
- }
-
- glog.V(1).Infof("Applied vacuum configuration: enabled=%v, threshold=%.1f%%, scan_interval=%ds, max_concurrent=%d",
- vacuumConfig.Enabled, vacuumConfig.GarbageThreshold*100, vacuumConfig.ScanIntervalSeconds, vacuumConfig.MaxConcurrent)
-
- return nil
-}
-
-// RegisterUI registers the vacuum UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *VacuumDetector, scheduler *VacuumScheduler) {
- logic := NewVacuumUILogic(detector, scheduler)
-
- tasks.CommonRegisterUI(
- types.TaskTypeVacuum,
- "Volume Vacuum",
- uiRegistry,
- detector,
- scheduler,
- GetConfigSchema,
- logic.GetCurrentConfig,
- logic.ApplyConfig,
- )
-}
-
-// GetUIProvider returns the UI provider for external use
-func GetUIProvider(uiRegistry *types.UIRegistry) types.TaskUIProvider {
- return uiRegistry.GetProvider(types.TaskTypeVacuum)
-}
diff --git a/weed/worker/tasks/vacuum/vacuum_detector.go b/weed/worker/tasks/vacuum/vacuum_detector.go
deleted file mode 100644
index 6d7230c6c..000000000
--- a/weed/worker/tasks/vacuum/vacuum_detector.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package vacuum
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// VacuumDetector implements vacuum task detection using code instead of schemas
-type VacuumDetector struct {
- enabled bool
- garbageThreshold float64
- minVolumeAge time.Duration
- scanInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*VacuumDetector)(nil)
- _ types.PolicyConfigurableDetector = (*VacuumDetector)(nil)
-)
-
-// NewVacuumDetector creates a new simple vacuum detector
-func NewVacuumDetector() *VacuumDetector {
- return &VacuumDetector{
- enabled: true,
- garbageThreshold: 0.3,
- minVolumeAge: 24 * time.Hour,
- scanInterval: 30 * time.Minute,
- }
-}
-
-// GetTaskType returns the task type
-func (d *VacuumDetector) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// ScanForTasks scans for volumes that need vacuum operations
-func (d *VacuumDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- var results []*types.TaskDetectionResult
-
- for _, metric := range volumeMetrics {
- // Check if volume needs vacuum
- if metric.GarbageRatio >= d.garbageThreshold && metric.Age >= d.minVolumeAge {
- // Higher priority for volumes with more garbage
- priority := types.TaskPriorityNormal
- if metric.GarbageRatio > 0.6 {
- priority = types.TaskPriorityHigh
- }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeVacuum,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: priority,
- Reason: "Volume has excessive garbage requiring vacuum",
- Parameters: map[string]interface{}{
- "garbage_ratio": metric.GarbageRatio,
- "volume_age": metric.Age.String(),
- },
- ScheduleAt: time.Now(),
- }
- results = append(results, result)
- }
- }
-
- glog.V(2).Infof("Vacuum detector found %d volumes needing vacuum", len(results))
- return results, nil
-}
-
-// ScanInterval returns how often this detector should scan
-func (d *VacuumDetector) ScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// IsEnabled returns whether this detector is enabled
-func (d *VacuumDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// Configuration setters
-
-func (d *VacuumDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
-}
-
-func (d *VacuumDetector) SetGarbageThreshold(threshold float64) {
- d.garbageThreshold = threshold
-}
-
-func (d *VacuumDetector) SetScanInterval(interval time.Duration) {
- d.scanInterval = interval
-}
-
-func (d *VacuumDetector) SetMinVolumeAge(age time.Duration) {
- d.minVolumeAge = age
-}
-
-// GetGarbageThreshold returns the current garbage threshold
-func (d *VacuumDetector) GetGarbageThreshold() float64 {
- return d.garbageThreshold
-}
-
-// GetMinVolumeAge returns the minimum volume age
-func (d *VacuumDetector) GetMinVolumeAge() time.Duration {
- return d.minVolumeAge
-}
-
-// GetScanInterval returns the scan interval
-func (d *VacuumDetector) GetScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// ConfigureFromPolicy configures the detector based on the maintenance policy
-func (d *VacuumDetector) ConfigureFromPolicy(policy interface{}) {
- // Type assert to the maintenance policy type we expect
- if maintenancePolicy, ok := policy.(interface {
- GetVacuumEnabled() bool
- GetVacuumGarbageRatio() float64
- }); ok {
- d.SetEnabled(maintenancePolicy.GetVacuumEnabled())
- d.SetGarbageThreshold(maintenancePolicy.GetVacuumGarbageRatio())
- } else {
- glog.V(1).Infof("Could not configure vacuum detector from policy: unsupported policy type")
- }
-}
diff --git a/weed/worker/tasks/vacuum/vacuum_register.go b/weed/worker/tasks/vacuum/vacuum_register.go
index 7d930a88e..42a32d287 100644
--- a/weed/worker/tasks/vacuum/vacuum_register.go
+++ b/weed/worker/tasks/vacuum/vacuum_register.go
@@ -1,81 +1,7 @@
package vacuum
-import (
- "fmt"
-
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// Factory creates vacuum task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
-
-// NewFactory creates a new vacuum task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeVacuum,
- []string{"vacuum", "storage"},
- "Vacuum operation to reclaim disk space by removing deleted files",
- ),
- }
-}
-
-// Create creates a new vacuum task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID)
- task.SetEstimatedDuration(task.EstimateTime(params))
-
- return task, nil
-}
-
-// Shared detector and scheduler instances
-var (
- sharedDetector *VacuumDetector
- sharedScheduler *VacuumScheduler
-)
-
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*VacuumDetector, *VacuumScheduler) {
- if sharedDetector == nil {
- sharedDetector = NewVacuumDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewVacuumScheduler()
- }
- return sharedDetector, sharedScheduler
-}
-
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*VacuumDetector, *VacuumScheduler) {
- return getSharedInstances()
-}
-
// Auto-register this task when the package is imported
func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeVacuum, factory)
-
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
-
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
-
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ // Use new architecture instead of old registration
+ initVacuumV2()
}
diff --git a/weed/worker/tasks/vacuum/vacuum_scheduler.go b/weed/worker/tasks/vacuum/vacuum_scheduler.go
deleted file mode 100644
index 2b67a9f40..000000000
--- a/weed/worker/tasks/vacuum/vacuum_scheduler.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package vacuum
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// VacuumScheduler implements vacuum task scheduling using code instead of schemas
-type VacuumScheduler struct {
- enabled bool
- maxConcurrent int
- minInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskScheduler = (*VacuumScheduler)(nil)
-)
-
-// NewVacuumScheduler creates a new simple vacuum scheduler
-func NewVacuumScheduler() *VacuumScheduler {
- return &VacuumScheduler{
- enabled: true,
- maxConcurrent: 2,
- minInterval: 6 * time.Hour,
- }
-}
-
-// GetTaskType returns the task type
-func (s *VacuumScheduler) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// CanScheduleNow determines if a vacuum task can be scheduled right now
-func (s *VacuumScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- // Check if scheduler is enabled
- if !s.enabled {
- return false
- }
-
- // Check concurrent limit
- runningVacuumCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeVacuum {
- runningVacuumCount++
- }
- }
-
- if runningVacuumCount >= s.maxConcurrent {
- return false
- }
-
- // Check if there's an available worker with vacuum capability
- for _, worker := range availableWorkers {
- if worker.CurrentLoad < worker.MaxConcurrent {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeVacuum {
- return true
- }
- }
- }
- }
-
- return false
-}
-
-// GetPriority returns the priority for this task
-func (s *VacuumScheduler) GetPriority(task *types.Task) types.TaskPriority {
- // Could adjust priority based on task parameters
- if params, ok := task.Parameters["garbage_ratio"].(float64); ok {
- if params > 0.8 {
- return types.TaskPriorityHigh
- }
- }
- return task.Priority
-}
-
-// GetMaxConcurrent returns max concurrent tasks of this type
-func (s *VacuumScheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating vacuum tasks
-func (s *VacuumScheduler) GetDefaultRepeatInterval() time.Duration {
- return s.minInterval
-}
-
-// IsEnabled returns whether this scheduler is enabled
-func (s *VacuumScheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// Configuration setters
-
-func (s *VacuumScheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
-}
-
-func (s *VacuumScheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
-}
-
-func (s *VacuumScheduler) SetMinInterval(interval time.Duration) {
- s.minInterval = interval
-}
-
-// GetMinInterval returns the minimum interval
-func (s *VacuumScheduler) GetMinInterval() time.Duration {
- return s.minInterval
-}
diff --git a/weed/worker/tasks/vacuum/vacuum_v2.go b/weed/worker/tasks/vacuum/vacuum_v2.go
new file mode 100644
index 000000000..29129cb0f
--- /dev/null
+++ b/weed/worker/tasks/vacuum/vacuum_v2.go
@@ -0,0 +1,269 @@
+package vacuum
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// VacuumConfigV2 extends BaseConfig with vacuum-specific settings
+type VacuumConfigV2 struct {
+ base.BaseConfig
+ GarbageThreshold float64 `json:"garbage_threshold"`
+ MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
+ MinIntervalSeconds int `json:"min_interval_seconds"`
+}
+
+// ToMap converts config to map (extend base functionality)
+func (c *VacuumConfigV2) ToMap() map[string]interface{} {
+ result := c.BaseConfig.ToMap()
+ result["garbage_threshold"] = c.GarbageThreshold
+ result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds
+ result["min_interval_seconds"] = c.MinIntervalSeconds
+ return result
+}
+
+// FromMap loads config from map (extend base functionality)
+func (c *VacuumConfigV2) FromMap(data map[string]interface{}) error {
+ // Load base config first
+ if err := c.BaseConfig.FromMap(data); err != nil {
+ return err
+ }
+
+ // Load vacuum-specific config
+ if threshold, ok := data["garbage_threshold"].(float64); ok {
+ c.GarbageThreshold = threshold
+ }
+ if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok {
+ c.MinVolumeAgeSeconds = ageSeconds
+ }
+ if intervalSeconds, ok := data["min_interval_seconds"].(int); ok {
+ c.MinIntervalSeconds = intervalSeconds
+ }
+ return nil
+}
+
+// vacuumDetection implements the detection logic for vacuum tasks
+func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ vacuumConfig := config.(*VacuumConfigV2)
+ var results []*types.TaskDetectionResult
+ minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
+
+ for _, metric := range metrics {
+ // Check if volume needs vacuum
+ if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
+ priority := types.TaskPriorityNormal
+ if metric.GarbageRatio > 0.6 {
+ priority = types.TaskPriorityHigh
+ }
+
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeVacuum,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: priority,
+ Reason: "Volume has excessive garbage requiring vacuum",
+ Parameters: map[string]interface{}{
+ "garbage_ratio": metric.GarbageRatio,
+ "volume_age": metric.Age.String(),
+ },
+ ScheduleAt: time.Now(),
+ }
+ results = append(results, result)
+ }
+ }
+
+ return results, nil
+}
+
+// vacuumScheduling implements the scheduling logic for vacuum tasks
+func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ vacuumConfig := config.(*VacuumConfigV2)
+
+ // Count running vacuum tasks
+ runningVacuumCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeVacuum {
+ runningVacuumCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningVacuumCount >= vacuumConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check for available workers with vacuum capability
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeVacuum {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// createVacuumTask creates a vacuum task instance
+func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Validate parameters
+ if params.VolumeID == 0 {
+ return nil, fmt.Errorf("volume_id is required")
+ }
+ if params.Server == "" {
+ return nil, fmt.Errorf("server is required")
+ }
+
+ // Use existing vacuum task implementation
+ task := NewTask(params.Server, params.VolumeID)
+ task.SetEstimatedDuration(task.EstimateTime(params))
+ return task, nil
+}
+
+// getVacuumConfigSpec returns the configuration schema for vacuum tasks
+func getVacuumConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Vacuum Tasks",
+ Description: "Whether vacuum tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic vacuum task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "garbage_threshold",
+ JSONName: "garbage_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.3,
+ MinValue: 0.0,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Garbage Percentage Threshold",
+ Description: "Trigger vacuum when garbage ratio exceeds this percentage",
+ HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
+ Placeholder: "0.30 (30%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 2 * 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing vacuum",
+ HelpText: "The system will check for volumes that need vacuuming at this interval",
+ Placeholder: "2",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 1,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of vacuum tasks that can run simultaneously",
+ HelpText: "Limits the number of vacuum operations running at the same time to control system load",
+ Placeholder: "2 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_volume_age_seconds",
+ JSONName: "min_volume_age_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 24 * 60 * 60,
+ MinValue: 1 * 60 * 60,
+ MaxValue: 7 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Volume Age",
+ Description: "Only vacuum volumes older than this duration",
+ HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
+ Placeholder: "24",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_interval_seconds",
+ JSONName: "min_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 7 * 24 * 60 * 60,
+ MinValue: 1 * 24 * 60 * 60,
+ MaxValue: 30 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Interval",
+ Description: "Minimum time between vacuum operations on the same volume",
+ HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
+ Placeholder: "7",
+ Unit: config.UnitDays,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// initVacuumV2 registers the refactored vacuum task (replaces the old registration)
+func initVacuumV2() {
+ // Create configuration instance
+ config := &VacuumConfigV2{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
+ MaxConcurrent: 2,
+ },
+ GarbageThreshold: 0.3, // 30%
+ MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }
+
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeVacuum,
+ Name: "vacuum",
+ DisplayName: "Volume Vacuum",
+ Description: "Reclaims disk space by removing deleted files from volumes",
+ Icon: "fas fa-broom text-primary",
+ Capabilities: []string{"vacuum", "storage"},
+
+ Config: config,
+ ConfigSpec: getVacuumConfigSpec(),
+ CreateTask: createVacuumTask,
+ DetectionFunc: vacuumDetection,
+ ScanInterval: 2 * time.Hour,
+ SchedulingFunc: vacuumScheduling,
+ MaxConcurrent: 2,
+ RepeatInterval: 7 * 24 * time.Hour,
+ }
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
+}
diff --git a/weed/worker/tasks/vacuum_v2/vacuum.go b/weed/worker/tasks/vacuum_v2/vacuum.go
new file mode 100644
index 000000000..3034d07d0
--- /dev/null
+++ b/weed/worker/tasks/vacuum_v2/vacuum.go
@@ -0,0 +1,260 @@
+package vacuum_v2
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// VacuumConfig extends BaseConfig with vacuum-specific settings
+type VacuumConfig struct {
+ base.BaseConfig
+ GarbageThreshold float64 `json:"garbage_threshold"`
+ MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
+ MinIntervalSeconds int `json:"min_interval_seconds"`
+}
+
+// ToMap converts config to map (extend base functionality)
+func (c *VacuumConfig) ToMap() map[string]interface{} {
+ result := c.BaseConfig.ToMap()
+ result["garbage_threshold"] = c.GarbageThreshold
+ result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds
+ result["min_interval_seconds"] = c.MinIntervalSeconds
+ return result
+}
+
+// FromMap loads config from map (extend base functionality)
+func (c *VacuumConfig) FromMap(data map[string]interface{}) error {
+ // Load base config first
+ if err := c.BaseConfig.FromMap(data); err != nil {
+ return err
+ }
+
+ // Load vacuum-specific config
+ if threshold, ok := data["garbage_threshold"].(float64); ok {
+ c.GarbageThreshold = threshold
+ }
+ if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok {
+ c.MinVolumeAgeSeconds = ageSeconds
+ }
+ if intervalSeconds, ok := data["min_interval_seconds"].(int); ok {
+ c.MinIntervalSeconds = intervalSeconds
+ }
+ return nil
+}
+
+// vacuumDetection implements the detection logic for vacuum tasks
+func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ vacuumConfig := config.(*VacuumConfig)
+ var results []*types.TaskDetectionResult
+ minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
+
+ for _, metric := range metrics {
+ // Check if volume needs vacuum
+ if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
+ priority := types.TaskPriorityNormal
+ if metric.GarbageRatio > 0.6 {
+ priority = types.TaskPriorityHigh
+ }
+
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeVacuum,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: priority,
+ Reason: "Volume has excessive garbage requiring vacuum",
+ Parameters: map[string]interface{}{
+ "garbage_ratio": metric.GarbageRatio,
+ "volume_age": metric.Age.String(),
+ },
+ ScheduleAt: time.Now(),
+ }
+ results = append(results, result)
+ }
+ }
+
+ return results, nil
+}
+
+// vacuumScheduling implements the scheduling logic for vacuum tasks
+func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ vacuumConfig := config.(*VacuumConfig)
+
+ // Count running vacuum tasks
+ runningVacuumCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeVacuum {
+ runningVacuumCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningVacuumCount >= vacuumConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check for available workers with vacuum capability
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeVacuum {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// createVacuumTask creates a vacuum task instance
+func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Validate parameters
+ if params.VolumeID == 0 {
+ return nil, fmt.Errorf("volume_id is required")
+ }
+ if params.Server == "" {
+ return nil, fmt.Errorf("server is required")
+ }
+
+ // Reuse existing vacuum task implementation
+ task := vacuum.NewTask(params.Server, params.VolumeID)
+ task.SetEstimatedDuration(task.EstimateTime(params))
+ return task, nil
+}
+
+// getConfigSpec returns the configuration schema for vacuum tasks
+func getConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Vacuum Tasks",
+ Description: "Whether vacuum tasks should be automatically created",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "garbage_threshold",
+ JSONName: "garbage_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.3,
+ MinValue: 0.0,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Garbage Percentage Threshold",
+ Description: "Trigger vacuum when garbage ratio exceeds this percentage",
+ Placeholder: "0.30 (30%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 2 * 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing vacuum",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 1,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of vacuum tasks that can run simultaneously",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_volume_age_seconds",
+ JSONName: "min_volume_age_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 24 * 60 * 60,
+ MinValue: 1 * 60 * 60,
+ MaxValue: 7 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Volume Age",
+ Description: "Only vacuum volumes older than this duration",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_interval_seconds",
+ JSONName: "min_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 7 * 24 * 60 * 60,
+ MinValue: 1 * 24 * 60 * 60,
+ MaxValue: 30 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Interval",
+ Description: "Minimum time between vacuum operations on the same volume",
+ Unit: config.UnitDays,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// RegisterVacuumV2 registers the refactored vacuum task
+func init() {
+ // Create configuration instance
+ config := &VacuumConfig{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
+ MaxConcurrent: 2,
+ },
+ GarbageThreshold: 0.3, // 30%
+ MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }
+
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeVacuum,
+ Name: "vacuum_v2",
+ DisplayName: "Volume Vacuum (V2)",
+ Description: "Reclaims disk space by removing deleted files from volumes",
+ Icon: "fas fa-broom text-primary",
+ Capabilities: []string{"vacuum", "storage"},
+
+ Config: config,
+ ConfigSpec: getConfigSpec(),
+ CreateTask: createVacuumTask,
+ DetectionFunc: vacuumDetection,
+ ScanInterval: 2 * time.Hour,
+ SchedulingFunc: vacuumScheduling,
+ MaxConcurrent: 2,
+ RepeatInterval: 7 * 24 * time.Hour,
+ }
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
+}