diff options
26 files changed, 1468 insertions, 1860 deletions
diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index a587d1b96..6ff40a8db 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -192,11 +192,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) { var config interface{} switch taskType { case types.TaskTypeVacuum: - config = &vacuum.VacuumConfig{} + config = &vacuum.VacuumConfigV2{} case types.TaskTypeBalance: - config = &balance.BalanceConfig{} + config = &balance.BalanceConfigV2{} case types.TaskTypeErasureCoding: - config = &erasure_coding.ErasureCodingConfig{} + config = &erasure_coding.ErasureCodingConfigV2{} default: c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName}) return diff --git a/weed/worker/tasks/balance/balance_detector.go b/weed/worker/tasks/balance/balance_detector.go deleted file mode 100644 index f082b7a77..000000000 --- a/weed/worker/tasks/balance/balance_detector.go +++ /dev/null @@ -1,171 +0,0 @@ -package balance - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// BalanceDetector implements TaskDetector for balance tasks -type BalanceDetector struct { - enabled bool - threshold float64 // Imbalance threshold (0.1 = 10%) - minCheckInterval time.Duration - minVolumeCount int - lastCheck time.Time -} - -// Compile-time interface assertions -var ( - _ types.TaskDetector = (*BalanceDetector)(nil) -) - -// NewBalanceDetector creates a new balance detector -func NewBalanceDetector() *BalanceDetector { - return &BalanceDetector{ - enabled: true, - threshold: 0.1, // 10% imbalance threshold - minCheckInterval: 1 * time.Hour, - minVolumeCount: 10, // Don't balance small clusters - lastCheck: time.Time{}, - } -} - -// GetTaskType returns the task type -func (d *BalanceDetector) GetTaskType() types.TaskType { - return types.TaskTypeBalance -} - -// ScanForTasks checks if cluster balance is needed -func (d *BalanceDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { - if !d.enabled { - return nil, nil - } - - glog.V(2).Infof("Scanning for balance tasks...") - - // Don't check too frequently - if time.Since(d.lastCheck) < d.minCheckInterval { - return nil, nil - } - d.lastCheck = time.Now() - - // Skip if cluster is too small - if len(volumeMetrics) < d.minVolumeCount { - glog.V(2).Infof("Cluster too small for balance (%d volumes < %d minimum)", len(volumeMetrics), d.minVolumeCount) - return nil, nil - } - - // Analyze volume distribution across servers - serverVolumeCounts := make(map[string]int) - for _, metric := range volumeMetrics { - serverVolumeCounts[metric.Server]++ - } - - if len(serverVolumeCounts) < 2 { - glog.V(2).Infof("Not enough servers for balance (%d servers)", len(serverVolumeCounts)) - return nil, nil - } - - // Calculate balance metrics - totalVolumes := len(volumeMetrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) - - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" - - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server - } - if count < minVolumes { - minVolumes = count - minServer = server - } - } - - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= d.threshold { - glog.V(2).Infof("Cluster is balanced (imbalance ratio: %.2f <= %.2f)", imbalanceRatio, d.threshold) - return nil, nil - } - - // Create balance task - reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - - task := &types.TaskDetectionResult{ - TaskType: types.TaskTypeBalance, - Priority: types.TaskPriorityNormal, - Reason: reason, - ScheduleAt: time.Now(), - Parameters: map[string]interface{}{ - "imbalance_ratio": imbalanceRatio, - "threshold": d.threshold, - "max_volumes": maxVolumes, - "min_volumes": minVolumes, - "avg_volumes_per_server": avgVolumesPerServer, - "max_server": maxServer, - "min_server": minServer, - "total_servers": len(serverVolumeCounts), - }, - } - - glog.V(1).Infof("π Found balance task: %s", reason) - return []*types.TaskDetectionResult{task}, nil -} - -// ScanInterval returns how often to scan -func (d *BalanceDetector) ScanInterval() time.Duration { - return d.minCheckInterval -} - -// IsEnabled returns whether the detector is enabled -func (d *BalanceDetector) IsEnabled() bool { - return d.enabled -} - -// SetEnabled sets whether the detector is enabled -func (d *BalanceDetector) SetEnabled(enabled bool) { - d.enabled = enabled - glog.V(1).Infof("π Balance detector enabled: %v", enabled) -} - -// SetThreshold sets the imbalance threshold -func (d *BalanceDetector) SetThreshold(threshold float64) { - d.threshold = threshold - glog.V(1).Infof("π Balance threshold set to: %.1f%%", threshold*100) -} - -// SetMinCheckInterval sets the minimum time between balance checks -func (d *BalanceDetector) SetMinCheckInterval(interval time.Duration) { - d.minCheckInterval = interval - glog.V(1).Infof("π Balance check interval set to: %v", interval) -} - -// SetMinVolumeCount sets the minimum volume count for balance operations -func (d *BalanceDetector) SetMinVolumeCount(count int) { - d.minVolumeCount = count - glog.V(1).Infof("π Balance minimum volume count set to: %d", count) -} - -// GetThreshold returns the current imbalance threshold -func (d *BalanceDetector) GetThreshold() float64 { - return d.threshold -} - -// GetMinCheckInterval returns the minimum check interval -func (d *BalanceDetector) GetMinCheckInterval() time.Duration { - return d.minCheckInterval -} - -// GetMinVolumeCount returns the minimum volume count -func (d *BalanceDetector) GetMinVolumeCount() int { - return d.minVolumeCount -} diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go index 7c2d5a520..def779389 100644 --- a/weed/worker/tasks/balance/balance_register.go +++ b/weed/worker/tasks/balance/balance_register.go @@ -1,81 +1,7 @@ package balance -import ( - "fmt" - - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Factory creates balance task instances -type Factory struct { - *tasks.BaseTaskFactory -} - -// NewFactory creates a new balance task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeBalance, - []string{"balance", "storage", "optimization"}, - "Balance data across volume servers for optimal performance", - ), - } -} - -// Create creates a new balance task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID, params.Collection) - task.SetEstimatedDuration(task.EstimateTime(params)) - - return task, nil -} - -// Shared detector and scheduler instances -var ( - sharedDetector *BalanceDetector - sharedScheduler *BalanceScheduler -) - -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*BalanceDetector, *BalanceScheduler) { - if sharedDetector == nil { - sharedDetector = NewBalanceDetector() - } - if sharedScheduler == nil { - sharedScheduler = NewBalanceScheduler() - } - return sharedDetector, sharedScheduler -} - -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) { - return getSharedInstances() -} - // Auto-register this task when the package is imported func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeBalance, factory) - - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() - - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) - - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + // Use new architecture instead of old registration + initBalanceV2() } diff --git a/weed/worker/tasks/balance/balance_scheduler.go b/weed/worker/tasks/balance/balance_scheduler.go deleted file mode 100644 index a8fefe465..000000000 --- a/weed/worker/tasks/balance/balance_scheduler.go +++ /dev/null @@ -1,197 +0,0 @@ -package balance - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// BalanceScheduler implements TaskScheduler for balance tasks -type BalanceScheduler struct { - enabled bool - maxConcurrent int - minInterval time.Duration - lastScheduled map[string]time.Time // track when we last scheduled a balance for each task type - minServerCount int - moveDuringOffHours bool - offHoursStart string - offHoursEnd string -} - -// Compile-time interface assertions -var ( - _ types.TaskScheduler = (*BalanceScheduler)(nil) -) - -// NewBalanceScheduler creates a new balance scheduler -func NewBalanceScheduler() *BalanceScheduler { - return &BalanceScheduler{ - enabled: true, - maxConcurrent: 1, // Only run one balance at a time - minInterval: 6 * time.Hour, - lastScheduled: make(map[string]time.Time), - minServerCount: 3, - moveDuringOffHours: true, - offHoursStart: "23:00", - offHoursEnd: "06:00", - } -} - -// GetTaskType returns the task type -func (s *BalanceScheduler) GetTaskType() types.TaskType { - return types.TaskTypeBalance -} - -// CanScheduleNow determines if a balance task can be scheduled -func (s *BalanceScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { - if !s.enabled { - return false - } - - // Count running balance tasks - runningBalanceCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeBalance { - runningBalanceCount++ - } - } - - // Check concurrency limit - if runningBalanceCount >= s.maxConcurrent { - glog.V(3).Infof("βΈοΈ Balance task blocked: too many running (%d >= %d)", runningBalanceCount, s.maxConcurrent) - return false - } - - // Check minimum interval between balance operations - if lastTime, exists := s.lastScheduled["balance"]; exists { - if time.Since(lastTime) < s.minInterval { - timeLeft := s.minInterval - time.Since(lastTime) - glog.V(3).Infof("βΈοΈ Balance task blocked: too soon (wait %v)", timeLeft) - return false - } - } - - // Check if we have available workers - availableWorkerCount := 0 - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeBalance { - availableWorkerCount++ - break - } - } - } - - if availableWorkerCount == 0 { - glog.V(3).Infof("βΈοΈ Balance task blocked: no available workers") - return false - } - - // All checks passed - can schedule - s.lastScheduled["balance"] = time.Now() - glog.V(2).Infof("β
Balance task can be scheduled (running: %d/%d, workers: %d)", - runningBalanceCount, s.maxConcurrent, availableWorkerCount) - return true -} - -// GetPriority returns the priority for balance tasks -func (s *BalanceScheduler) GetPriority(task *types.Task) types.TaskPriority { - // Balance is typically normal priority - not urgent but important for optimization - return types.TaskPriorityNormal -} - -// GetMaxConcurrent returns the maximum concurrent balance tasks -func (s *BalanceScheduler) GetMaxConcurrent() int { - return s.maxConcurrent -} - -// GetDefaultRepeatInterval returns the default interval to wait before repeating balance tasks -func (s *BalanceScheduler) GetDefaultRepeatInterval() time.Duration { - return s.minInterval -} - -// IsEnabled returns whether the scheduler is enabled -func (s *BalanceScheduler) IsEnabled() bool { - return s.enabled -} - -// SetEnabled sets whether the scheduler is enabled -func (s *BalanceScheduler) SetEnabled(enabled bool) { - s.enabled = enabled - glog.V(1).Infof("π Balance scheduler enabled: %v", enabled) -} - -// SetMaxConcurrent sets the maximum concurrent balance tasks -func (s *BalanceScheduler) SetMaxConcurrent(max int) { - s.maxConcurrent = max - glog.V(1).Infof("π Balance max concurrent set to: %d", max) -} - -// SetMinInterval sets the minimum interval between balance operations -func (s *BalanceScheduler) SetMinInterval(interval time.Duration) { - s.minInterval = interval - glog.V(1).Infof("π Balance minimum interval set to: %v", interval) -} - -// GetLastScheduled returns when we last scheduled this task type -func (s *BalanceScheduler) GetLastScheduled(taskKey string) time.Time { - if lastTime, exists := s.lastScheduled[taskKey]; exists { - return lastTime - } - return time.Time{} -} - -// SetLastScheduled updates when we last scheduled this task type -func (s *BalanceScheduler) SetLastScheduled(taskKey string, when time.Time) { - s.lastScheduled[taskKey] = when -} - -// GetMinServerCount returns the minimum server count -func (s *BalanceScheduler) GetMinServerCount() int { - return s.minServerCount -} - -// SetMinServerCount sets the minimum server count -func (s *BalanceScheduler) SetMinServerCount(count int) { - s.minServerCount = count - glog.V(1).Infof("π Balance minimum server count set to: %d", count) -} - -// GetMoveDuringOffHours returns whether to move only during off-hours -func (s *BalanceScheduler) GetMoveDuringOffHours() bool { - return s.moveDuringOffHours -} - -// SetMoveDuringOffHours sets whether to move only during off-hours -func (s *BalanceScheduler) SetMoveDuringOffHours(enabled bool) { - s.moveDuringOffHours = enabled - glog.V(1).Infof("π Balance move during off-hours: %v", enabled) -} - -// GetOffHoursStart returns the off-hours start time -func (s *BalanceScheduler) GetOffHoursStart() string { - return s.offHoursStart -} - -// SetOffHoursStart sets the off-hours start time -func (s *BalanceScheduler) SetOffHoursStart(start string) { - s.offHoursStart = start - glog.V(1).Infof("π Balance off-hours start time set to: %s", start) -} - -// GetOffHoursEnd returns the off-hours end time -func (s *BalanceScheduler) GetOffHoursEnd() string { - return s.offHoursEnd -} - -// SetOffHoursEnd sets the off-hours end time -func (s *BalanceScheduler) SetOffHoursEnd(end string) { - s.offHoursEnd = end - glog.V(1).Infof("π Balance off-hours end time set to: %s", end) -} - -// GetMinInterval returns the minimum interval -func (s *BalanceScheduler) GetMinInterval() time.Duration { - return s.minInterval -} diff --git a/weed/worker/tasks/balance/balance_v2.go b/weed/worker/tasks/balance/balance_v2.go new file mode 100644 index 000000000..cb66d26d8 --- /dev/null +++ b/weed/worker/tasks/balance/balance_v2.go @@ -0,0 +1,274 @@ +package balance + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// BalanceConfigV2 extends BaseConfig with balance-specific settings +type BalanceConfigV2 struct { + base.BaseConfig + ImbalanceThreshold float64 `json:"imbalance_threshold"` + MinServerCount int `json:"min_server_count"` +} + +// ToMap converts config to map (extend base functionality) +func (c *BalanceConfigV2) ToMap() map[string]interface{} { + result := c.BaseConfig.ToMap() + result["imbalance_threshold"] = c.ImbalanceThreshold + result["min_server_count"] = c.MinServerCount + return result +} + +// FromMap loads config from map (extend base functionality) +func (c *BalanceConfigV2) FromMap(data map[string]interface{}) error { + // Load base config first + if err := c.BaseConfig.FromMap(data); err != nil { + return err + } + + // Load balance-specific config + if threshold, ok := data["imbalance_threshold"].(float64); ok { + c.ImbalanceThreshold = threshold + } + if serverCount, ok := data["min_server_count"].(int); ok { + c.MinServerCount = serverCount + } + return nil +} + +// balanceDetection implements the detection logic for balance tasks +func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + balanceConfig := config.(*BalanceConfigV2) + + // Skip if cluster is too small + minVolumeCount := 10 + if len(metrics) < minVolumeCount { + return nil, nil + } + + // Analyze volume distribution across servers + serverVolumeCounts := make(map[string]int) + for _, metric := range metrics { + serverVolumeCounts[metric.Server]++ + } + + if len(serverVolumeCounts) < balanceConfig.MinServerCount { + return nil, nil + } + + // Calculate balance metrics + totalVolumes := len(metrics) + avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) + + maxVolumes := 0 + minVolumes := totalVolumes + maxServer := "" + minServer := "" + + for server, count := range serverVolumeCounts { + if count > maxVolumes { + maxVolumes = count + maxServer = server + } + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + // Check if imbalance exceeds threshold + imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer + if imbalanceRatio <= balanceConfig.ImbalanceThreshold { + return nil, nil + } + + // Create balance task + reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", + imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + + task := &types.TaskDetectionResult{ + TaskType: types.TaskTypeBalance, + Priority: types.TaskPriorityNormal, + Reason: reason, + ScheduleAt: time.Now(), + Parameters: map[string]interface{}{ + "imbalance_ratio": imbalanceRatio, + "threshold": balanceConfig.ImbalanceThreshold, + "max_volumes": maxVolumes, + "min_volumes": minVolumes, + "avg_volumes_per_server": avgVolumesPerServer, + "max_server": maxServer, + "min_server": minServer, + "total_servers": len(serverVolumeCounts), + }, + } + + return []*types.TaskDetectionResult{task}, nil +} + +// balanceScheduling implements the scheduling logic for balance tasks +func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + balanceConfig := config.(*BalanceConfigV2) + + // Count running balance tasks + runningBalanceCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeBalance { + runningBalanceCount++ + } + } + + // Check concurrency limit + if runningBalanceCount >= balanceConfig.MaxConcurrent { + return false + } + + // Check if we have available workers + availableWorkerCount := 0 + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeBalance { + availableWorkerCount++ + break + } + } + } + + return availableWorkerCount > 0 +} + +// createBalanceTask creates a balance task instance +func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) { + // Validate parameters + if params.VolumeID == 0 { + return nil, fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return nil, fmt.Errorf("server is required") + } + + task := NewTask(params.Server, params.VolumeID, params.Collection) + task.SetEstimatedDuration(task.EstimateTime(params)) + return task, nil +} + +// getBalanceConfigSpec returns the configuration schema for balance tasks +func getBalanceConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Balance Tasks", + Description: "Whether balance tasks should be automatically created", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "imbalance_threshold", + JSONName: "imbalance_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.1, // 10% + MinValue: 0.01, + MaxValue: 0.5, + Required: true, + DisplayName: "Imbalance Threshold", + Description: "Trigger balance when storage imbalance exceeds this ratio", + Placeholder: "0.10 (10%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 6 * 60 * 60, // 6 hours + MinValue: 1 * 60 * 60, // 1 hour + MaxValue: 24 * 60 * 60, // 24 hours + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for imbalanced volumes", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 5, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of balance tasks that can run simultaneously", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_server_count", + JSONName: "min_server_count", + Type: config.FieldTypeInt, + DefaultValue: 3, + MinValue: 2, + MaxValue: 20, + Required: true, + DisplayName: "Minimum Server Count", + Description: "Only balance when at least this many servers are available", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} + +// initBalanceV2 registers the refactored balance task +func initBalanceV2() { + // Create configuration instance + config := &BalanceConfigV2{ + BaseConfig: base.BaseConfig{ + Enabled: false, // Conservative default + ScanIntervalSeconds: 6 * 60 * 60, // 6 hours + MaxConcurrent: 2, + }, + ImbalanceThreshold: 0.1, // 10% + MinServerCount: 3, + } + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeBalance, + Name: "balance", + DisplayName: "Volume Balance", + Description: "Redistributes volumes across volume servers to optimize storage utilization", + Icon: "fas fa-balance-scale text-secondary", + Capabilities: []string{"balance", "storage", "optimization"}, + + Config: config, + ConfigSpec: getBalanceConfigSpec(), + CreateTask: createBalanceTask, + DetectionFunc: balanceDetection, + ScanInterval: 6 * time.Hour, + SchedulingFunc: balanceScheduling, + MaxConcurrent: 2, + RepeatInterval: 12 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) +} diff --git a/weed/worker/tasks/balance/config_schema.go b/weed/worker/tasks/balance/config_schema.go deleted file mode 100644 index 91c5714d7..000000000 --- a/weed/worker/tasks/balance/config_schema.go +++ /dev/null @@ -1,88 +0,0 @@ -package balance - -import ( - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" -) - -// GetConfigSchema returns the schema for balance task configuration -func GetConfigSchema() *tasks.TaskConfigSchema { - return &tasks.TaskConfigSchema{ - TaskName: "balance", - DisplayName: "Volume Balance", - Description: "Redistributes volumes across volume servers to optimize storage utilization", - Icon: "fas fa-balance-scale text-secondary", - Schema: config.Schema{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Balance Tasks", - Description: "Whether balance tasks should be automatically created", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "imbalance_threshold", - JSONName: "imbalance_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.1, // 10% - MinValue: 0.01, - MaxValue: 0.5, - Required: true, - DisplayName: "Imbalance Threshold", - Description: "Trigger balance when storage imbalance exceeds this ratio", - Placeholder: "0.10 (10%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 6 * 60 * 60, // 6 hours - MinValue: 1 * 60 * 60, // 1 hour - MaxValue: 24 * 60 * 60, // 24 hours - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for imbalanced volumes", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 1, - MaxValue: 5, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of balance tasks that can run simultaneously", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_server_count", - JSONName: "min_server_count", - Type: config.FieldTypeInt, - DefaultValue: 3, - MinValue: 2, - MaxValue: 20, - Required: true, - DisplayName: "Minimum Server Count", - Description: "Only balance when at least this many servers are available", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - }, - }, - } -} diff --git a/weed/worker/tasks/balance/schema_provider.go b/weed/worker/tasks/balance/schema_provider.go deleted file mode 100644 index a208148f0..000000000 --- a/weed/worker/tasks/balance/schema_provider.go +++ /dev/null @@ -1,18 +0,0 @@ -package balance - -import ( - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" -) - -// SchemaProvider implements the TaskConfigSchemaProvider interface for balance tasks -type SchemaProvider struct{} - -// GetConfigSchema returns the schema for balance task configuration -func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema { - return GetConfigSchema() -} - -// init registers the balance schema provider -func init() { - tasks.RegisterTaskConfigSchema("balance", &SchemaProvider{}) -} diff --git a/weed/worker/tasks/balance/ui.go b/weed/worker/tasks/balance/ui.go deleted file mode 100644 index 6e34b68a2..000000000 --- a/weed/worker/tasks/balance/ui.go +++ /dev/null @@ -1,115 +0,0 @@ -package balance - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// BalanceConfig represents the balance configuration matching the schema -type BalanceConfig struct { - Enabled bool `json:"enabled"` - ImbalanceThreshold float64 `json:"imbalance_threshold"` - ScanIntervalSeconds int `json:"scan_interval_seconds"` - MaxConcurrent int `json:"max_concurrent"` - MinServerCount int `json:"min_server_count"` -} - -// BalanceUILogic contains the business logic for balance UI operations -type BalanceUILogic struct { - detector *BalanceDetector - scheduler *BalanceScheduler -} - -// NewBalanceUILogic creates new balance UI logic -func NewBalanceUILogic(detector *BalanceDetector, scheduler *BalanceScheduler) *BalanceUILogic { - return &BalanceUILogic{ - detector: detector, - scheduler: scheduler, - } -} - -// GetCurrentConfig returns the current balance configuration -func (logic *BalanceUILogic) GetCurrentConfig() interface{} { - config := &BalanceConfig{ - // Default values from schema (matching task_config_schema.go) - Enabled: true, - ImbalanceThreshold: 0.1, // 10% - ScanIntervalSeconds: 6 * 60 * 60, // 6 hours - MaxConcurrent: 2, - MinServerCount: 3, - } - - // Get current values from detector - if logic.detector != nil { - config.Enabled = logic.detector.IsEnabled() - config.ImbalanceThreshold = logic.detector.GetThreshold() - config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds()) - } - - // Get current values from scheduler - if logic.scheduler != nil { - config.MaxConcurrent = logic.scheduler.GetMaxConcurrent() - config.MinServerCount = logic.scheduler.GetMinServerCount() - } - - return config -} - -// ApplyConfig applies the balance configuration -func (logic *BalanceUILogic) ApplyConfig(config interface{}) error { - balanceConfig, ok := config.(*BalanceConfig) - if !ok { - return fmt.Errorf("invalid configuration type for balance") - } - - // Apply to detector - if logic.detector != nil { - logic.detector.SetEnabled(balanceConfig.Enabled) - logic.detector.SetThreshold(balanceConfig.ImbalanceThreshold) - logic.detector.SetMinCheckInterval(time.Duration(balanceConfig.ScanIntervalSeconds) * time.Second) - } - - // Apply to scheduler - if logic.scheduler != nil { - logic.scheduler.SetEnabled(balanceConfig.Enabled) - logic.scheduler.SetMaxConcurrent(balanceConfig.MaxConcurrent) - logic.scheduler.SetMinServerCount(balanceConfig.MinServerCount) - } - - glog.V(1).Infof("Applied balance configuration: enabled=%v, threshold=%.1f%%, max_concurrent=%d, min_servers=%d", - balanceConfig.Enabled, balanceConfig.ImbalanceThreshold*100, balanceConfig.MaxConcurrent, - balanceConfig.MinServerCount) - - return nil -} - -// RegisterUI registers the balance UI provider with the UI registry -func RegisterUI(uiRegistry *types.UIRegistry, detector *BalanceDetector, scheduler *BalanceScheduler) { - logic := NewBalanceUILogic(detector, scheduler) - - tasks.CommonRegisterUI( - types.TaskTypeBalance, - "Volume Balance", - uiRegistry, - detector, - scheduler, - GetConfigSchema, - logic.GetCurrentConfig, - logic.ApplyConfig, - ) -} - -// DefaultBalanceConfig returns default balance configuration -func DefaultBalanceConfig() *BalanceConfig { - return &BalanceConfig{ - Enabled: false, - ImbalanceThreshold: 0.1, // 10% - ScanIntervalSeconds: 6 * 60 * 60, // 6 hours - MaxConcurrent: 2, - MinServerCount: 3, - } -} diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go new file mode 100644 index 000000000..27ad1bb29 --- /dev/null +++ b/weed/worker/tasks/base/generic_components.go @@ -0,0 +1,129 @@ +package base + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// GenericDetector implements TaskDetector using function-based logic +type GenericDetector struct { + taskDef *TaskDefinition +} + +// NewGenericDetector creates a detector from a task definition +func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector { + return &GenericDetector{taskDef: taskDef} +} + +// GetTaskType returns the task type +func (d *GenericDetector) GetTaskType() types.TaskType { + return d.taskDef.Type +} + +// ScanForTasks scans using the task definition's detection function +func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { + if d.taskDef.DetectionFunc == nil { + return nil, nil + } + return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config) +} + +// ScanInterval returns the scan interval from task definition +func (d *GenericDetector) ScanInterval() time.Duration { + if d.taskDef.ScanInterval > 0 { + return d.taskDef.ScanInterval + } + return 30 * time.Minute // Default +} + +// IsEnabled returns whether this detector is enabled +func (d *GenericDetector) IsEnabled() bool { + return d.taskDef.Config.IsEnabled() +} + +// GenericScheduler implements TaskScheduler using function-based logic +type GenericScheduler struct { + taskDef *TaskDefinition +} + +// NewGenericScheduler creates a scheduler from a task definition +func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler { + return &GenericScheduler{taskDef: taskDef} +} + +// GetTaskType returns the task type +func (s *GenericScheduler) GetTaskType() types.TaskType { + return s.taskDef.Type +} + +// CanScheduleNow determines if a task can be scheduled using the task definition's function +func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { + if s.taskDef.SchedulingFunc == nil { + return s.defaultCanSchedule(task, runningTasks, availableWorkers) + } + return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config) +} + +// defaultCanSchedule provides default scheduling logic +func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { + if !s.taskDef.Config.IsEnabled() { + return false + } + + // Count running tasks of this type + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == s.taskDef.Type { + runningCount++ + } + } + + // Check concurrency limit + maxConcurrent := s.taskDef.MaxConcurrent + if maxConcurrent <= 0 { + maxConcurrent = 1 // Default + } + if runningCount >= maxConcurrent { + return false + } + + // Check if we have available workers + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == s.taskDef.Type { + return true + } + } + } + } + + return false +} + +// GetPriority returns the priority for this task +func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority { + return task.Priority +} + +// GetMaxConcurrent returns max concurrent tasks +func (s *GenericScheduler) GetMaxConcurrent() int { + if s.taskDef.MaxConcurrent > 0 { + return s.taskDef.MaxConcurrent + } + return 1 // Default +} + +// GetDefaultRepeatInterval returns the default repeat interval +func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration { + if s.taskDef.RepeatInterval > 0 { + return s.taskDef.RepeatInterval + } + return 24 * time.Hour // Default +} + +// IsEnabled returns whether this scheduler is enabled +func (s *GenericScheduler) IsEnabled() bool { + return s.taskDef.Config.IsEnabled() +} diff --git a/weed/worker/tasks/base/registration.go b/weed/worker/tasks/base/registration.go new file mode 100644 index 000000000..6a91feac1 --- /dev/null +++ b/weed/worker/tasks/base/registration.go @@ -0,0 +1,131 @@ +package base + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// GenericFactory creates task instances using a TaskDefinition +type GenericFactory struct { + *tasks.BaseTaskFactory + taskDef *TaskDefinition +} + +// NewGenericFactory creates a generic task factory +func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory { + return &GenericFactory{ + BaseTaskFactory: tasks.NewBaseTaskFactory( + taskDef.Type, + taskDef.Capabilities, + taskDef.Description, + ), + taskDef: taskDef, + } +} + +// Create creates a task instance using the task definition +func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) { + if f.taskDef.CreateTask == nil { + return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type) + } + return f.taskDef.CreateTask(params) +} + +// GenericSchemaProvider provides config schema from TaskDefinition +type GenericSchemaProvider struct { + taskDef *TaskDefinition +} + +// GetConfigSchema returns the schema from task definition +func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema { + return &tasks.TaskConfigSchema{ + TaskName: string(p.taskDef.Type), + DisplayName: p.taskDef.DisplayName, + Description: p.taskDef.Description, + Icon: p.taskDef.Icon, + Schema: config.Schema{ + Fields: p.taskDef.ConfigSpec.Fields, + }, + } +} + +// GenericUIProvider provides UI functionality from TaskDefinition +type GenericUIProvider struct { + taskDef *TaskDefinition +} + +// GetCurrentConfig returns current config as interface{} +func (ui *GenericUIProvider) GetCurrentConfig() interface{} { + return ui.taskDef.Config +} + +// ApplyConfig applies configuration +func (ui *GenericUIProvider) ApplyConfig(config interface{}) error { + if configMap, ok := config.(map[string]interface{}); ok { + return ui.taskDef.Config.FromMap(configMap) + } + return fmt.Errorf("invalid config format for %s", ui.taskDef.Type) +} + +// RegisterTask registers a complete task definition with all registries +func RegisterTask(taskDef *TaskDefinition) { + // Validate task definition + if err := validateTaskDefinition(taskDef); err != nil { + glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err) + return + } + + // Create and register factory + factory := NewGenericFactory(taskDef) + tasks.AutoRegister(taskDef.Type, factory) + + // Create and register detector/scheduler + detector := NewGenericDetector(taskDef) + scheduler := NewGenericScheduler(taskDef) + + tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { + registry.RegisterTask(detector, scheduler) + }) + + // Create and register schema provider + schemaProvider := &GenericSchemaProvider{taskDef: taskDef} + tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider) + + // Create and register UI provider + uiProvider := &GenericUIProvider{taskDef: taskDef} + tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { + baseUIProvider := tasks.NewBaseUIProvider( + taskDef.Type, + taskDef.DisplayName, + taskDef.Description, + taskDef.Icon, + schemaProvider.GetConfigSchema, + uiProvider.GetCurrentConfig, + uiProvider.ApplyConfig, + ) + uiRegistry.RegisterUI(baseUIProvider) + }) + + glog.V(1).Infof("β
Registered complete task definition: %s", taskDef.Type) +} + +// validateTaskDefinition ensures the task definition is complete +func validateTaskDefinition(taskDef *TaskDefinition) error { + if taskDef.Type == "" { + return fmt.Errorf("task type is required") + } + if taskDef.Name == "" { + return fmt.Errorf("task name is required") + } + if taskDef.Config == nil { + return fmt.Errorf("task config is required") + } + if taskDef.CreateTask == nil { + return fmt.Errorf("task creation function is required") + } + return nil +} diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go new file mode 100644 index 000000000..c32471664 --- /dev/null +++ b/weed/worker/tasks/base/task_definition.go @@ -0,0 +1,95 @@ +package base + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TaskDefinition encapsulates everything needed to define a complete task type +type TaskDefinition struct { + // Basic task information + Type types.TaskType + Name string + DisplayName string + Description string + Icon string + Capabilities []string + + // Task configuration + Config TaskConfig + ConfigSpec ConfigSpec + + // Task creation + CreateTask func(params types.TaskParams) (types.TaskInterface, error) + + // Detection logic + DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error) + ScanInterval time.Duration + + // Scheduling logic + SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool + MaxConcurrent int + RepeatInterval time.Duration +} + +// TaskConfig provides a simple configuration interface +type TaskConfig interface { + IsEnabled() bool + SetEnabled(bool) + Validate() error + ToMap() map[string]interface{} + FromMap(map[string]interface{}) error +} + +// ConfigSpec defines the configuration schema +type ConfigSpec struct { + Fields []*config.Field +} + +// BaseConfig provides common configuration fields +type BaseConfig struct { + Enabled bool `json:"enabled"` + ScanIntervalSeconds int `json:"scan_interval_seconds"` + MaxConcurrent int `json:"max_concurrent"` +} + +// IsEnabled returns whether the task is enabled +func (c *BaseConfig) IsEnabled() bool { + return c.Enabled +} + +// SetEnabled sets whether the task is enabled +func (c *BaseConfig) SetEnabled(enabled bool) { + c.Enabled = enabled +} + +// Validate validates the base configuration +func (c *BaseConfig) Validate() error { + // Common validation logic + return nil +} + +// ToMap converts config to map +func (c *BaseConfig) ToMap() map[string]interface{} { + return map[string]interface{}{ + "enabled": c.Enabled, + "scan_interval_seconds": c.ScanIntervalSeconds, + "max_concurrent": c.MaxConcurrent, + } +} + +// FromMap loads config from map +func (c *BaseConfig) FromMap(data map[string]interface{}) error { + if enabled, ok := data["enabled"].(bool); ok { + c.Enabled = enabled + } + if interval, ok := data["scan_interval_seconds"].(int); ok { + c.ScanIntervalSeconds = interval + } + if concurrent, ok := data["max_concurrent"].(int); ok { + c.MaxConcurrent = concurrent + } + return nil +} diff --git a/weed/worker/tasks/erasure_coding/config_schema.go b/weed/worker/tasks/erasure_coding/config_schema.go deleted file mode 100644 index 24b96137c..000000000 --- a/weed/worker/tasks/erasure_coding/config_schema.go +++ /dev/null @@ -1,100 +0,0 @@ -package erasure_coding - -import ( - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" -) - -// GetConfigSchema returns the schema for erasure coding task configuration -func GetConfigSchema() *tasks.TaskConfigSchema { - return &tasks.TaskConfigSchema{ - TaskName: "erasure_coding", - DisplayName: "Erasure Coding", - Description: "Converts volumes to erasure coded format for improved data durability", - Icon: "fas fa-shield-alt text-info", - Schema: config.Schema{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Erasure Coding Tasks", - Description: "Whether erasure coding tasks should be automatically created", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "quiet_for_seconds", - JSONName: "quiet_for_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 7 * 24 * 60 * 60, // 7 days - MinValue: 1 * 24 * 60 * 60, // 1 day - MaxValue: 30 * 24 * 60 * 60, // 30 days - Required: true, - DisplayName: "Quiet For Duration", - Description: "Only apply erasure coding to volumes that have not been modified for this duration", - Unit: config.UnitDays, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 12 * 60 * 60, // 12 hours - MinValue: 2 * 60 * 60, // 2 hours - MaxValue: 24 * 60 * 60, // 24 hours - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing erasure coding", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 1, - MinValue: 1, - MaxValue: 3, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of erasure coding tasks that can run simultaneously", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "fullness_ratio", - JSONName: "fullness_ratio", - Type: config.FieldTypeFloat, - DefaultValue: 0.9, // 90% - MinValue: 0.5, - MaxValue: 1.0, - Required: true, - DisplayName: "Fullness Ratio", - Description: "Only apply erasure coding to volumes with fullness ratio above this threshold", - Placeholder: "0.90 (90%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "collection_filter", - JSONName: "collection_filter", - Type: config.FieldTypeString, - DefaultValue: "", - Required: false, - DisplayName: "Collection Filter", - Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)", - Placeholder: "collection1,collection2", - InputType: "text", - CSSClasses: "form-control", - }, - }, - }, - } -} diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go deleted file mode 100644 index 5fd13845f..000000000 --- a/weed/worker/tasks/erasure_coding/ec_detector.go +++ /dev/null @@ -1,226 +0,0 @@ -package erasure_coding - -import ( - "fmt" - "strings" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// EcDetector implements erasure coding task detection -type EcDetector struct { - enabled bool - quietForSeconds int - fullnessRatio float64 - minSizeMB int // Minimum volume size in MB before considering EC - scanInterval time.Duration - collectionFilter string -} - -// Compile-time interface assertions -var ( - _ types.TaskDetector = (*EcDetector)(nil) - _ types.PolicyConfigurableDetector = (*EcDetector)(nil) -) - -// NewEcDetector creates a new erasure coding detector with production defaults -func NewEcDetector() *EcDetector { - return &EcDetector{ - enabled: false, // Conservative default - enable via configuration - quietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period - fullnessRatio: 0.90, // 90% full threshold - minSizeMB: 100, // Minimum 100MB volume size - scanInterval: 12 * time.Hour, // Scan every 12 hours - collectionFilter: "", // No collection filter by default - } -} - -// GetTaskType returns the task type -func (d *EcDetector) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// ScanForTasks scans for volumes that should be converted to erasure coding -func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { - if !d.enabled { - glog.V(2).Infof("EC detector is disabled") - return nil, nil - } - - var results []*types.TaskDetectionResult - now := time.Now() - quietThreshold := time.Duration(d.quietForSeconds) * time.Second - minSizeBytes := uint64(d.minSizeMB) * 1024 * 1024 - - glog.V(2).Infof("EC detector scanning %d volumes with thresholds: quietFor=%ds, fullness=%.2f, minSize=%dMB", - len(volumeMetrics), d.quietForSeconds, d.fullnessRatio, d.minSizeMB) - - for _, metric := range volumeMetrics { - // Skip if already EC volume - if metric.IsECVolume { - continue - } - - // Check minimum size requirement - if metric.Size < minSizeBytes { - continue - } - - // Check collection filter if specified - if d.collectionFilter != "" { - // Parse comma-separated collections - allowedCollections := make(map[string]bool) - for _, collection := range strings.Split(d.collectionFilter, ",") { - allowedCollections[strings.TrimSpace(collection)] = true - } - // Skip if volume's collection is not in the allowed list - if !allowedCollections[metric.Collection] { - continue - } - } - - // Check quiet duration and fullness criteria - if metric.Age >= quietThreshold && metric.FullnessRatio >= d.fullnessRatio { - // Note: Removed read-only requirement for testing - // In production, you might want to enable this: - // if !metric.IsReadOnly { - // continue - // } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeErasureCoding, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: types.TaskPriorityLow, // EC is not urgent - Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)", - metric.Age.Seconds(), d.quietForSeconds, metric.FullnessRatio*100, d.fullnessRatio*100, - float64(metric.Size)/(1024*1024), d.minSizeMB), - Parameters: map[string]interface{}{ - "age_seconds": int(metric.Age.Seconds()), - "fullness_ratio": metric.FullnessRatio, - "size_mb": int(metric.Size / (1024 * 1024)), - }, - ScheduleAt: now, - } - results = append(results, result) - - glog.V(1).Infof("EC task detected for volume %d on %s: %s", metric.VolumeID, metric.Server, result.Reason) - } - } - - glog.V(1).Infof("EC detector found %d tasks to schedule", len(results)) - return results, nil -} - -// ScanInterval returns how often this task type should be scanned -func (d *EcDetector) ScanInterval() time.Duration { - return d.scanInterval -} - -// IsEnabled returns whether this task type is enabled -func (d *EcDetector) IsEnabled() bool { - return d.enabled -} - -// Configuration methods for runtime configuration - -// Configure sets detector configuration from policy -func (d *EcDetector) Configure(config map[string]interface{}) error { - if enabled, ok := config["enabled"].(bool); ok { - d.enabled = enabled - } - - if ageSeconds, ok := config["quiet_for_seconds"].(float64); ok { - d.quietForSeconds = int(ageSeconds) - } - - if fullnessRatio, ok := config["fullness_ratio"].(float64); ok { - d.fullnessRatio = fullnessRatio - } - - if minSizeMB, ok := config["min_size_mb"].(float64); ok { - d.minSizeMB = int(minSizeMB) - } - - if collectionFilter, ok := config["collection_filter"].(string); ok { - d.collectionFilter = collectionFilter - } - - glog.V(1).Infof("EC detector configured: enabled=%v, quietFor=%ds, fullness=%.2f, minSize=%dMB, collection_filter='%s'", - d.enabled, d.quietForSeconds, d.fullnessRatio, d.minSizeMB, d.collectionFilter) - - return nil -} - -// SetEnabled sets whether the detector is enabled -func (d *EcDetector) SetEnabled(enabled bool) { - d.enabled = enabled -} - -// SetQuietForSeconds sets the quiet duration threshold in seconds -func (d *EcDetector) SetQuietForSeconds(seconds int) { - d.quietForSeconds = seconds -} - -// SetFullnessRatio sets the fullness ratio threshold -func (d *EcDetector) SetFullnessRatio(ratio float64) { - d.fullnessRatio = ratio -} - -// SetCollectionFilter sets the collection filter -func (d *EcDetector) SetCollectionFilter(filter string) { - d.collectionFilter = filter -} - -// SetScanInterval sets the scan interval -func (d *EcDetector) SetScanInterval(interval time.Duration) { - d.scanInterval = interval -} - -// GetQuietForSeconds returns the current quiet duration threshold in seconds -func (d *EcDetector) GetQuietForSeconds() int { - return d.quietForSeconds -} - -// GetFullnessRatio returns the current fullness ratio threshold -func (d *EcDetector) GetFullnessRatio() float64 { - return d.fullnessRatio -} - -// GetCollectionFilter returns the current collection filter -func (d *EcDetector) GetCollectionFilter() string { - return d.collectionFilter -} - -// GetScanInterval returns the scan interval -func (d *EcDetector) GetScanInterval() time.Duration { - return d.scanInterval -} - -// ConfigureFromPolicy configures the detector from maintenance policy -func (d *EcDetector) ConfigureFromPolicy(policy interface{}) { - // Cast policy to maintenance policy type - if maintenancePolicy, ok := policy.(*types.MaintenancePolicy); ok { - // Get EC-specific configuration from policy - ecConfig := maintenancePolicy.GetTaskConfig(types.TaskTypeErasureCoding) - - if ecConfig != nil { - // Convert to map for easier access - if configMap, ok := ecConfig.(map[string]interface{}); ok { - d.Configure(configMap) - } else { - glog.Warningf("EC detector policy configuration is not a map: %T", ecConfig) - } - } else { - // No specific configuration found, use defaults with policy-based enabled status - enabled := maintenancePolicy.GlobalSettings != nil && maintenancePolicy.GlobalSettings.MaintenanceEnabled - glog.V(2).Infof("No EC-specific config found, using default with enabled=%v", enabled) - d.enabled = enabled - } - } else { - glog.Warningf("ConfigureFromPolicy received unknown policy type: %T", policy) - } -} diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go index 4fe7e47bf..f8aed927f 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/ec_register.go @@ -1,90 +1,7 @@ package erasure_coding -import ( - "fmt" - - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Factory creates erasure coding task instances -type Factory struct { - *tasks.BaseTaskFactory -} - -// NewFactory creates a new erasure coding task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeErasureCoding, - []string{"erasure_coding", "storage", "durability"}, - "Convert volumes to erasure coded format for improved durability", - ), - } -} - -// Create creates a new erasure coding task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - // Extract additional parameters for comprehensive EC - masterClient := "localhost:9333" // Default master client - workDir := "/tmp/seaweedfs_ec_work" // Default work directory - - if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" { - masterClient = mc - } - if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" { - workDir = wd - } - - // Create EC task with comprehensive capabilities - task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir) - - // Set gRPC dial option if provided - if params.GrpcDialOption != nil { - task.SetDialOption(params.GrpcDialOption) - } - - task.SetEstimatedDuration(task.EstimateTime(params)) - - return task, nil -} - -// getSharedInstances returns shared detector and scheduler instances -func getSharedInstances() (*EcDetector, *Scheduler) { - // Create shared instances (singleton pattern) - detector := NewEcDetector() - scheduler := NewScheduler() - return detector, scheduler -} - -// GetSharedInstances returns the shared detector and scheduler instances (public API) -func GetSharedInstances() (*EcDetector, *Scheduler) { - return getSharedInstances() -} - // Auto-register this task when the package is imported func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeErasureCoding, factory) - - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() - - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) - - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + // Use new architecture instead of old registration + initErasureCodingV2() } diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go deleted file mode 100644 index d74c4c6ae..000000000 --- a/weed/worker/tasks/erasure_coding/ec_scheduler.go +++ /dev/null @@ -1,95 +0,0 @@ -package erasure_coding - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Scheduler implements erasure coding task scheduling -type Scheduler struct { - maxConcurrent int - enabled bool -} - -// NewScheduler creates a new erasure coding scheduler -func NewScheduler() *Scheduler { - return &Scheduler{ - maxConcurrent: 1, // Conservative default - enabled: false, // Conservative default - } -} - -// GetTaskType returns the task type -func (s *Scheduler) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// CanScheduleNow determines if an erasure coding task can be scheduled now -func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { - if !s.enabled { - return false - } - - // Check if we have available workers - if len(availableWorkers) == 0 { - return false - } - - // Count running EC tasks - runningCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { - runningCount++ - } - } - - // Check concurrency limit - if runningCount >= s.maxConcurrent { - glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent) - return false - } - - // Check if any worker can handle EC tasks - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { - glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID) - return true - } - } - } - - return false -} - -// GetMaxConcurrent returns the maximum number of concurrent tasks -func (s *Scheduler) GetMaxConcurrent() int { - return s.maxConcurrent -} - -// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks -func (s *Scheduler) GetDefaultRepeatInterval() time.Duration { - return 24 * time.Hour // Don't repeat EC for 24 hours -} - -// GetPriority returns the priority for this task -func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority { - return types.TaskPriorityLow // EC is not urgent -} - -// IsEnabled returns whether this task type is enabled -func (s *Scheduler) IsEnabled() bool { - return s.enabled -} - -// Configuration setters - -func (s *Scheduler) SetEnabled(enabled bool) { - s.enabled = enabled -} - -func (s *Scheduler) SetMaxConcurrent(max int) { - s.maxConcurrent = max -} diff --git a/weed/worker/tasks/erasure_coding/ec_v2.go b/weed/worker/tasks/erasure_coding/ec_v2.go new file mode 100644 index 000000000..ca2b28768 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/ec_v2.go @@ -0,0 +1,301 @@ +package erasure_coding + +import ( + "fmt" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// ErasureCodingConfigV2 extends BaseConfig with erasure coding specific settings +type ErasureCodingConfigV2 struct { + base.BaseConfig + QuietForSeconds int `json:"quiet_for_seconds"` + FullnessRatio float64 `json:"fullness_ratio"` + CollectionFilter string `json:"collection_filter"` +} + +// ToMap converts config to map (extend base functionality) +func (c *ErasureCodingConfigV2) ToMap() map[string]interface{} { + result := c.BaseConfig.ToMap() + result["quiet_for_seconds"] = c.QuietForSeconds + result["fullness_ratio"] = c.FullnessRatio + result["collection_filter"] = c.CollectionFilter + return result +} + +// FromMap loads config from map (extend base functionality) +func (c *ErasureCodingConfigV2) FromMap(data map[string]interface{}) error { + // Load base config first + if err := c.BaseConfig.FromMap(data); err != nil { + return err + } + + // Load erasure coding specific config + if quietFor, ok := data["quiet_for_seconds"].(int); ok { + c.QuietForSeconds = quietFor + } + if fullness, ok := data["fullness_ratio"].(float64); ok { + c.FullnessRatio = fullness + } + if filter, ok := data["collection_filter"].(string); ok { + c.CollectionFilter = filter + } + return nil +} + +// ecDetection implements the detection logic for erasure coding tasks +func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + ecConfig := config.(*ErasureCodingConfigV2) + var results []*types.TaskDetectionResult + now := time.Now() + quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second + minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum + + for _, metric := range metrics { + // Skip if already EC volume + if metric.IsECVolume { + continue + } + + // Check minimum size requirement + if metric.Size < minSizeBytes { + continue + } + + // Check collection filter if specified + if ecConfig.CollectionFilter != "" { + // Parse comma-separated collections + allowedCollections := make(map[string]bool) + for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { + allowedCollections[strings.TrimSpace(collection)] = true + } + // Skip if volume's collection is not in the allowed list + if !allowedCollections[metric.Collection] { + continue + } + } + + // Check quiet duration and fullness criteria + if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeErasureCoding, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: types.TaskPriorityLow, // EC is not urgent + Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)", + metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, + float64(metric.Size)/(1024*1024)), + Parameters: map[string]interface{}{ + "age_seconds": int(metric.Age.Seconds()), + "fullness_ratio": metric.FullnessRatio, + "size_mb": int(metric.Size / (1024 * 1024)), + }, + ScheduleAt: now, + } + results = append(results, result) + } + } + + return results, nil +} + +// ecScheduling implements the scheduling logic for erasure coding tasks +func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + ecConfig := config.(*ErasureCodingConfigV2) + + // Check if we have available workers + if len(availableWorkers) == 0 { + return false + } + + // Count running EC tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeErasureCoding { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecConfig.MaxConcurrent { + return false + } + + // Check if any worker can handle EC tasks + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeErasureCoding { + return true + } + } + } + + return false +} + +// createErasureCodingTask creates an erasure coding task instance +func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) { + // Validate parameters + if params.VolumeID == 0 { + return nil, fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return nil, fmt.Errorf("server is required") + } + + // Extract additional parameters for comprehensive EC + masterClient := "localhost:9333" // Default master client + workDir := "/tmp/seaweedfs_ec_work" // Default work directory + + if mc, ok := params.Parameters["master_client"].(string); ok && mc != "" { + masterClient = mc + } + if wd, ok := params.Parameters["work_dir"].(string); ok && wd != "" { + workDir = wd + } + + // Create EC task with comprehensive capabilities + task := NewTaskWithParams(params.Server, params.VolumeID, masterClient, workDir) + + // Set gRPC dial option if provided + if params.GrpcDialOption != nil { + task.SetDialOption(params.GrpcDialOption) + } + + task.SetEstimatedDuration(task.EstimateTime(params)) + return task, nil +} + +// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks +func getErasureCodingConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Erasure Coding Tasks", + Description: "Whether erasure coding tasks should be automatically created", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "quiet_for_seconds", + JSONName: "quiet_for_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 7 * 24 * 60 * 60, // 7 days + MinValue: 1 * 24 * 60 * 60, // 1 day + MaxValue: 30 * 24 * 60 * 60, // 30 days + Required: true, + DisplayName: "Quiet For Duration", + Description: "Only apply erasure coding to volumes that have not been modified for this duration", + Unit: config.UnitDays, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 12 * 60 * 60, // 12 hours + MinValue: 2 * 60 * 60, // 2 hours + MaxValue: 24 * 60 * 60, // 24 hours + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing erasure coding", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 3, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of erasure coding tasks that can run simultaneously", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "fullness_ratio", + JSONName: "fullness_ratio", + Type: config.FieldTypeFloat, + DefaultValue: 0.9, // 90% + MinValue: 0.5, + MaxValue: 1.0, + Required: true, + DisplayName: "Fullness Ratio", + Description: "Only apply erasure coding to volumes with fullness ratio above this threshold", + Placeholder: "0.90 (90%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "collection_filter", + JSONName: "collection_filter", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Collection Filter", + Description: "Only apply erasure coding to volumes in these collections (comma-separated, leave empty for all)", + Placeholder: "collection1,collection2", + InputType: "text", + CSSClasses: "form-control", + }, + }, + } +} + +// initErasureCodingV2 registers the refactored erasure coding task +func initErasureCodingV2() { + // Create configuration instance + config := &ErasureCodingConfigV2{ + BaseConfig: base.BaseConfig{ + Enabled: false, // Conservative default - enable via configuration + ScanIntervalSeconds: 12 * 60 * 60, // 12 hours + MaxConcurrent: 1, // Conservative default + }, + QuietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period + FullnessRatio: 0.90, // 90% full threshold + CollectionFilter: "", // No collection filter by default + } + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeErasureCoding, + Name: "erasure_coding", + DisplayName: "Erasure Coding", + Description: "Converts volumes to erasure coded format for improved data durability", + Icon: "fas fa-shield-alt text-info", + Capabilities: []string{"erasure_coding", "storage", "durability"}, + + Config: config, + ConfigSpec: getErasureCodingConfigSpec(), + CreateTask: createErasureCodingTask, + DetectionFunc: ecDetection, + ScanInterval: 12 * time.Hour, + SchedulingFunc: ecScheduling, + MaxConcurrent: 1, + RepeatInterval: 24 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) +} diff --git a/weed/worker/tasks/erasure_coding/schema_provider.go b/weed/worker/tasks/erasure_coding/schema_provider.go deleted file mode 100644 index 0940b6c85..000000000 --- a/weed/worker/tasks/erasure_coding/schema_provider.go +++ /dev/null @@ -1,18 +0,0 @@ -package erasure_coding - -import ( - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" -) - -// SchemaProvider implements the TaskConfigSchemaProvider interface for erasure coding tasks -type SchemaProvider struct{} - -// GetConfigSchema returns the schema for erasure coding task configuration -func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema { - return GetConfigSchema() -} - -// init registers the erasure coding schema provider -func init() { - tasks.RegisterTaskConfigSchema("erasure_coding", &SchemaProvider{}) -} diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go deleted file mode 100644 index 2ab338722..000000000 --- a/weed/worker/tasks/erasure_coding/ui.go +++ /dev/null @@ -1,107 +0,0 @@ -package erasure_coding - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// ErasureCodingConfig represents the erasure coding configuration matching the schema -type ErasureCodingConfig struct { - Enabled bool `json:"enabled"` - QuietForSeconds int `json:"quiet_for_seconds"` - ScanIntervalSeconds int `json:"scan_interval_seconds"` - MaxConcurrent int `json:"max_concurrent"` - FullnessRatio float64 `json:"fullness_ratio"` - CollectionFilter string `json:"collection_filter"` -} - -// ErasureCodingUILogic contains the business logic for erasure coding UI operations -type ErasureCodingUILogic struct { - detector *EcDetector - scheduler *Scheduler -} - -// NewErasureCodingUILogic creates new erasure coding UI logic -func NewErasureCodingUILogic(detector *EcDetector, scheduler *Scheduler) *ErasureCodingUILogic { - return &ErasureCodingUILogic{ - detector: detector, - scheduler: scheduler, - } -} - -// GetCurrentConfig returns the current erasure coding configuration -func (logic *ErasureCodingUILogic) GetCurrentConfig() interface{} { - config := ErasureCodingConfig{ - // Default values from schema (matching task_config_schema.go) - Enabled: true, - QuietForSeconds: 7 * 24 * 60 * 60, // 7 days - ScanIntervalSeconds: 12 * 60 * 60, // 12 hours - MaxConcurrent: 1, - FullnessRatio: 0.9, // 90% - CollectionFilter: "", - } - - // Get current values from detector - if logic.detector != nil { - config.Enabled = logic.detector.IsEnabled() - config.QuietForSeconds = logic.detector.GetQuietForSeconds() - config.FullnessRatio = logic.detector.GetFullnessRatio() - config.CollectionFilter = logic.detector.GetCollectionFilter() - config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds()) - } - - // Get current values from scheduler - if logic.scheduler != nil { - config.MaxConcurrent = logic.scheduler.GetMaxConcurrent() - } - - return config -} - -// ApplyConfig applies the erasure coding configuration -func (logic *ErasureCodingUILogic) ApplyConfig(config interface{}) error { - ecConfig, ok := config.(ErasureCodingConfig) - if !ok { - return fmt.Errorf("invalid configuration type for erasure coding") - } - - // Apply to detector - if logic.detector != nil { - logic.detector.SetEnabled(ecConfig.Enabled) - logic.detector.SetQuietForSeconds(ecConfig.QuietForSeconds) - logic.detector.SetFullnessRatio(ecConfig.FullnessRatio) - logic.detector.SetCollectionFilter(ecConfig.CollectionFilter) - logic.detector.SetScanInterval(time.Duration(ecConfig.ScanIntervalSeconds) * time.Second) - } - - // Apply to scheduler - if logic.scheduler != nil { - logic.scheduler.SetEnabled(ecConfig.Enabled) - logic.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent) - } - - glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, quiet_for=%v seconds, max_concurrent=%d, fullness_ratio=%f, collection_filter=%s", - ecConfig.Enabled, ecConfig.QuietForSeconds, ecConfig.MaxConcurrent, ecConfig.FullnessRatio, ecConfig.CollectionFilter) - - return nil -} - -// RegisterUI registers the erasure coding UI provider with the UI registry -func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) { - logic := NewErasureCodingUILogic(detector, scheduler) - - tasks.CommonRegisterUI( - types.TaskTypeErasureCoding, - "Erasure Coding", - uiRegistry, - detector, - scheduler, - GetConfigSchema, - logic.GetCurrentConfig, - logic.ApplyConfig, - ) -} diff --git a/weed/worker/tasks/vacuum/config_schema.go b/weed/worker/tasks/vacuum/config_schema.go deleted file mode 100644 index 3c063f4ee..000000000 --- a/weed/worker/tasks/vacuum/config_schema.go +++ /dev/null @@ -1,112 +0,0 @@ -package vacuum - -import ( - "github.com/seaweedfs/seaweedfs/weed/admin/config" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" -) - -// GetConfigSchema returns the schema for vacuum task configuration -func GetConfigSchema() *tasks.TaskConfigSchema { - return &tasks.TaskConfigSchema{ - TaskName: "vacuum", - DisplayName: "Volume Vacuum", - Description: "Reclaims disk space by removing deleted files from volumes", - Icon: "fas fa-broom text-primary", - Schema: config.Schema{ - Fields: []*config.Field{ - { - Name: "enabled", - JSONName: "enabled", - Type: config.FieldTypeBool, - DefaultValue: true, - Required: false, - DisplayName: "Enable Vacuum Tasks", - Description: "Whether vacuum tasks should be automatically created", - HelpText: "Toggle this to enable or disable automatic vacuum task generation", - InputType: "checkbox", - CSSClasses: "form-check-input", - }, - { - Name: "garbage_threshold", - JSONName: "garbage_threshold", - Type: config.FieldTypeFloat, - DefaultValue: 0.3, // 30% - MinValue: 0.0, - MaxValue: 1.0, - Required: true, - DisplayName: "Garbage Percentage Threshold", - Description: "Trigger vacuum when garbage ratio exceeds this percentage", - HelpText: "Volumes with more deleted content than this threshold will be vacuumed", - Placeholder: "0.30 (30%)", - Unit: config.UnitNone, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "scan_interval_seconds", - JSONName: "scan_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 2 * 60 * 60, // 2 hours - MinValue: 10 * 60, // 10 minutes - MaxValue: 24 * 60 * 60, // 24 hours - Required: true, - DisplayName: "Scan Interval", - Description: "How often to scan for volumes needing vacuum", - HelpText: "The system will check for volumes that need vacuuming at this interval", - Placeholder: "2", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "max_concurrent", - JSONName: "max_concurrent", - Type: config.FieldTypeInt, - DefaultValue: 2, - MinValue: 1, - MaxValue: 10, - Required: true, - DisplayName: "Max Concurrent Tasks", - Description: "Maximum number of vacuum tasks that can run simultaneously", - HelpText: "Limits the number of vacuum operations running at the same time to control system load", - Placeholder: "2 (default)", - Unit: config.UnitCount, - InputType: "number", - CSSClasses: "form-control", - }, - { - Name: "min_volume_age_seconds", - JSONName: "min_volume_age_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 24 * 60 * 60, // 24 hours - MinValue: 1 * 60 * 60, // 1 hour - MaxValue: 7 * 24 * 60 * 60, // 7 days - Required: true, - DisplayName: "Minimum Volume Age", - Description: "Only vacuum volumes older than this duration", - HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", - Placeholder: "24", - Unit: config.UnitHours, - InputType: "interval", - CSSClasses: "form-control", - }, - { - Name: "min_interval_seconds", - JSONName: "min_interval_seconds", - Type: config.FieldTypeInterval, - DefaultValue: 7 * 24 * 60 * 60, // 7 days - MinValue: 1 * 24 * 60 * 60, // 1 day - MaxValue: 30 * 24 * 60 * 60, // 30 days - Required: true, - DisplayName: "Minimum Interval", - Description: "Minimum time between vacuum operations on the same volume", - HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", - Placeholder: "7", - Unit: config.UnitDays, - InputType: "interval", - CSSClasses: "form-control", - }, - }, - }, - } -} diff --git a/weed/worker/tasks/vacuum/schema_provider.go b/weed/worker/tasks/vacuum/schema_provider.go deleted file mode 100644 index a3872b049..000000000 --- a/weed/worker/tasks/vacuum/schema_provider.go +++ /dev/null @@ -1,18 +0,0 @@ -package vacuum - -import ( - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" -) - -// SchemaProvider implements the TaskConfigSchemaProvider interface for vacuum tasks -type SchemaProvider struct{} - -// GetConfigSchema returns the schema for vacuum task configuration -func (p *SchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema { - return GetConfigSchema() -} - -// init registers the vacuum schema provider -func init() { - tasks.RegisterTaskConfigSchema("vacuum", &SchemaProvider{}) -} diff --git a/weed/worker/tasks/vacuum/ui.go b/weed/worker/tasks/vacuum/ui.go deleted file mode 100644 index a6176ffa6..000000000 --- a/weed/worker/tasks/vacuum/ui.go +++ /dev/null @@ -1,112 +0,0 @@ -package vacuum - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// VacuumConfig represents the vacuum configuration matching the schema -type VacuumConfig struct { - Enabled bool `json:"enabled"` - GarbageThreshold float64 `json:"garbage_threshold"` - ScanIntervalSeconds int `json:"scan_interval_seconds"` - MaxConcurrent int `json:"max_concurrent"` - MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` - MinIntervalSeconds int `json:"min_interval_seconds"` -} - -// VacuumUILogic contains the business logic for vacuum UI operations -type VacuumUILogic struct { - detector *VacuumDetector - scheduler *VacuumScheduler -} - -// NewVacuumUILogic creates new vacuum UI logic -func NewVacuumUILogic(detector *VacuumDetector, scheduler *VacuumScheduler) *VacuumUILogic { - return &VacuumUILogic{ - detector: detector, - scheduler: scheduler, - } -} - -// GetCurrentConfig returns the current vacuum configuration -func (logic *VacuumUILogic) GetCurrentConfig() interface{} { - config := &VacuumConfig{ - // Default values from schema (matching task_config_schema.go) - Enabled: true, - GarbageThreshold: 0.3, // 30% - ScanIntervalSeconds: 2 * 60 * 60, // 2 hours - MaxConcurrent: 2, - MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - } - - // Get current values from detector - if logic.detector != nil { - config.Enabled = logic.detector.IsEnabled() - config.GarbageThreshold = logic.detector.GetGarbageThreshold() - config.ScanIntervalSeconds = int(logic.detector.ScanInterval().Seconds()) - config.MinVolumeAgeSeconds = int(logic.detector.GetMinVolumeAge().Seconds()) - } - - // Get current values from scheduler - if logic.scheduler != nil { - config.MaxConcurrent = logic.scheduler.GetMaxConcurrent() - config.MinIntervalSeconds = int(logic.scheduler.GetMinInterval().Seconds()) - } - - return config -} - -// ApplyConfig applies the vacuum configuration -func (logic *VacuumUILogic) ApplyConfig(config interface{}) error { - vacuumConfig, ok := config.(*VacuumConfig) - if !ok { - return fmt.Errorf("invalid configuration type for vacuum") - } - - // Apply to detector - if logic.detector != nil { - logic.detector.SetEnabled(vacuumConfig.Enabled) - logic.detector.SetGarbageThreshold(vacuumConfig.GarbageThreshold) - logic.detector.SetScanInterval(time.Duration(vacuumConfig.ScanIntervalSeconds) * time.Second) - logic.detector.SetMinVolumeAge(time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second) - } - - // Apply to scheduler - if logic.scheduler != nil { - logic.scheduler.SetEnabled(vacuumConfig.Enabled) - logic.scheduler.SetMaxConcurrent(vacuumConfig.MaxConcurrent) - logic.scheduler.SetMinInterval(time.Duration(vacuumConfig.MinIntervalSeconds) * time.Second) - } - - glog.V(1).Infof("Applied vacuum configuration: enabled=%v, threshold=%.1f%%, scan_interval=%ds, max_concurrent=%d", - vacuumConfig.Enabled, vacuumConfig.GarbageThreshold*100, vacuumConfig.ScanIntervalSeconds, vacuumConfig.MaxConcurrent) - - return nil -} - -// RegisterUI registers the vacuum UI provider with the UI registry -func RegisterUI(uiRegistry *types.UIRegistry, detector *VacuumDetector, scheduler *VacuumScheduler) { - logic := NewVacuumUILogic(detector, scheduler) - - tasks.CommonRegisterUI( - types.TaskTypeVacuum, - "Volume Vacuum", - uiRegistry, - detector, - scheduler, - GetConfigSchema, - logic.GetCurrentConfig, - logic.ApplyConfig, - ) -} - -// GetUIProvider returns the UI provider for external use -func GetUIProvider(uiRegistry *types.UIRegistry) types.TaskUIProvider { - return uiRegistry.GetProvider(types.TaskTypeVacuum) -} diff --git a/weed/worker/tasks/vacuum/vacuum_detector.go b/weed/worker/tasks/vacuum/vacuum_detector.go deleted file mode 100644 index 6d7230c6c..000000000 --- a/weed/worker/tasks/vacuum/vacuum_detector.go +++ /dev/null @@ -1,132 +0,0 @@ -package vacuum - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// VacuumDetector implements vacuum task detection using code instead of schemas -type VacuumDetector struct { - enabled bool - garbageThreshold float64 - minVolumeAge time.Duration - scanInterval time.Duration -} - -// Compile-time interface assertions -var ( - _ types.TaskDetector = (*VacuumDetector)(nil) - _ types.PolicyConfigurableDetector = (*VacuumDetector)(nil) -) - -// NewVacuumDetector creates a new simple vacuum detector -func NewVacuumDetector() *VacuumDetector { - return &VacuumDetector{ - enabled: true, - garbageThreshold: 0.3, - minVolumeAge: 24 * time.Hour, - scanInterval: 30 * time.Minute, - } -} - -// GetTaskType returns the task type -func (d *VacuumDetector) GetTaskType() types.TaskType { - return types.TaskTypeVacuum -} - -// ScanForTasks scans for volumes that need vacuum operations -func (d *VacuumDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { - if !d.enabled { - return nil, nil - } - - var results []*types.TaskDetectionResult - - for _, metric := range volumeMetrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= d.garbageThreshold && metric.Age >= d.minVolumeAge { - // Higher priority for volumes with more garbage - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - Parameters: map[string]interface{}{ - "garbage_ratio": metric.GarbageRatio, - "volume_age": metric.Age.String(), - }, - ScheduleAt: time.Now(), - } - results = append(results, result) - } - } - - glog.V(2).Infof("Vacuum detector found %d volumes needing vacuum", len(results)) - return results, nil -} - -// ScanInterval returns how often this detector should scan -func (d *VacuumDetector) ScanInterval() time.Duration { - return d.scanInterval -} - -// IsEnabled returns whether this detector is enabled -func (d *VacuumDetector) IsEnabled() bool { - return d.enabled -} - -// Configuration setters - -func (d *VacuumDetector) SetEnabled(enabled bool) { - d.enabled = enabled -} - -func (d *VacuumDetector) SetGarbageThreshold(threshold float64) { - d.garbageThreshold = threshold -} - -func (d *VacuumDetector) SetScanInterval(interval time.Duration) { - d.scanInterval = interval -} - -func (d *VacuumDetector) SetMinVolumeAge(age time.Duration) { - d.minVolumeAge = age -} - -// GetGarbageThreshold returns the current garbage threshold -func (d *VacuumDetector) GetGarbageThreshold() float64 { - return d.garbageThreshold -} - -// GetMinVolumeAge returns the minimum volume age -func (d *VacuumDetector) GetMinVolumeAge() time.Duration { - return d.minVolumeAge -} - -// GetScanInterval returns the scan interval -func (d *VacuumDetector) GetScanInterval() time.Duration { - return d.scanInterval -} - -// ConfigureFromPolicy configures the detector based on the maintenance policy -func (d *VacuumDetector) ConfigureFromPolicy(policy interface{}) { - // Type assert to the maintenance policy type we expect - if maintenancePolicy, ok := policy.(interface { - GetVacuumEnabled() bool - GetVacuumGarbageRatio() float64 - }); ok { - d.SetEnabled(maintenancePolicy.GetVacuumEnabled()) - d.SetGarbageThreshold(maintenancePolicy.GetVacuumGarbageRatio()) - } else { - glog.V(1).Infof("Could not configure vacuum detector from policy: unsupported policy type") - } -} diff --git a/weed/worker/tasks/vacuum/vacuum_register.go b/weed/worker/tasks/vacuum/vacuum_register.go index 7d930a88e..42a32d287 100644 --- a/weed/worker/tasks/vacuum/vacuum_register.go +++ b/weed/worker/tasks/vacuum/vacuum_register.go @@ -1,81 +1,7 @@ package vacuum -import ( - "fmt" - - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Factory creates vacuum task instances -type Factory struct { - *tasks.BaseTaskFactory -} - -// NewFactory creates a new vacuum task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeVacuum, - []string{"vacuum", "storage"}, - "Vacuum operation to reclaim disk space by removing deleted files", - ), - } -} - -// Create creates a new vacuum task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID) - task.SetEstimatedDuration(task.EstimateTime(params)) - - return task, nil -} - -// Shared detector and scheduler instances -var ( - sharedDetector *VacuumDetector - sharedScheduler *VacuumScheduler -) - -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*VacuumDetector, *VacuumScheduler) { - if sharedDetector == nil { - sharedDetector = NewVacuumDetector() - } - if sharedScheduler == nil { - sharedScheduler = NewVacuumScheduler() - } - return sharedDetector, sharedScheduler -} - -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*VacuumDetector, *VacuumScheduler) { - return getSharedInstances() -} - // Auto-register this task when the package is imported func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeVacuum, factory) - - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() - - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) - - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + // Use new architecture instead of old registration + initVacuumV2() } diff --git a/weed/worker/tasks/vacuum/vacuum_scheduler.go b/weed/worker/tasks/vacuum/vacuum_scheduler.go deleted file mode 100644 index 2b67a9f40..000000000 --- a/weed/worker/tasks/vacuum/vacuum_scheduler.go +++ /dev/null @@ -1,111 +0,0 @@ -package vacuum - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// VacuumScheduler implements vacuum task scheduling using code instead of schemas -type VacuumScheduler struct { - enabled bool - maxConcurrent int - minInterval time.Duration -} - -// Compile-time interface assertions -var ( - _ types.TaskScheduler = (*VacuumScheduler)(nil) -) - -// NewVacuumScheduler creates a new simple vacuum scheduler -func NewVacuumScheduler() *VacuumScheduler { - return &VacuumScheduler{ - enabled: true, - maxConcurrent: 2, - minInterval: 6 * time.Hour, - } -} - -// GetTaskType returns the task type -func (s *VacuumScheduler) GetTaskType() types.TaskType { - return types.TaskTypeVacuum -} - -// CanScheduleNow determines if a vacuum task can be scheduled right now -func (s *VacuumScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { - // Check if scheduler is enabled - if !s.enabled { - return false - } - - // Check concurrent limit - runningVacuumCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeVacuum { - runningVacuumCount++ - } - } - - if runningVacuumCount >= s.maxConcurrent { - return false - } - - // Check if there's an available worker with vacuum capability - for _, worker := range availableWorkers { - if worker.CurrentLoad < worker.MaxConcurrent { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeVacuum { - return true - } - } - } - } - - return false -} - -// GetPriority returns the priority for this task -func (s *VacuumScheduler) GetPriority(task *types.Task) types.TaskPriority { - // Could adjust priority based on task parameters - if params, ok := task.Parameters["garbage_ratio"].(float64); ok { - if params > 0.8 { - return types.TaskPriorityHigh - } - } - return task.Priority -} - -// GetMaxConcurrent returns max concurrent tasks of this type -func (s *VacuumScheduler) GetMaxConcurrent() int { - return s.maxConcurrent -} - -// GetDefaultRepeatInterval returns the default interval to wait before repeating vacuum tasks -func (s *VacuumScheduler) GetDefaultRepeatInterval() time.Duration { - return s.minInterval -} - -// IsEnabled returns whether this scheduler is enabled -func (s *VacuumScheduler) IsEnabled() bool { - return s.enabled -} - -// Configuration setters - -func (s *VacuumScheduler) SetEnabled(enabled bool) { - s.enabled = enabled -} - -func (s *VacuumScheduler) SetMaxConcurrent(max int) { - s.maxConcurrent = max -} - -func (s *VacuumScheduler) SetMinInterval(interval time.Duration) { - s.minInterval = interval -} - -// GetMinInterval returns the minimum interval -func (s *VacuumScheduler) GetMinInterval() time.Duration { - return s.minInterval -} diff --git a/weed/worker/tasks/vacuum/vacuum_v2.go b/weed/worker/tasks/vacuum/vacuum_v2.go new file mode 100644 index 000000000..29129cb0f --- /dev/null +++ b/weed/worker/tasks/vacuum/vacuum_v2.go @@ -0,0 +1,269 @@ +package vacuum + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// VacuumConfigV2 extends BaseConfig with vacuum-specific settings +type VacuumConfigV2 struct { + base.BaseConfig + GarbageThreshold float64 `json:"garbage_threshold"` + MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` + MinIntervalSeconds int `json:"min_interval_seconds"` +} + +// ToMap converts config to map (extend base functionality) +func (c *VacuumConfigV2) ToMap() map[string]interface{} { + result := c.BaseConfig.ToMap() + result["garbage_threshold"] = c.GarbageThreshold + result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds + result["min_interval_seconds"] = c.MinIntervalSeconds + return result +} + +// FromMap loads config from map (extend base functionality) +func (c *VacuumConfigV2) FromMap(data map[string]interface{}) error { + // Load base config first + if err := c.BaseConfig.FromMap(data); err != nil { + return err + } + + // Load vacuum-specific config + if threshold, ok := data["garbage_threshold"].(float64); ok { + c.GarbageThreshold = threshold + } + if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok { + c.MinVolumeAgeSeconds = ageSeconds + } + if intervalSeconds, ok := data["min_interval_seconds"].(int); ok { + c.MinIntervalSeconds = intervalSeconds + } + return nil +} + +// vacuumDetection implements the detection logic for vacuum tasks +func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + vacuumConfig := config.(*VacuumConfigV2) + var results []*types.TaskDetectionResult + minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second + + for _, metric := range metrics { + // Check if volume needs vacuum + if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { + priority := types.TaskPriorityNormal + if metric.GarbageRatio > 0.6 { + priority = types.TaskPriorityHigh + } + + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeVacuum, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: priority, + Reason: "Volume has excessive garbage requiring vacuum", + Parameters: map[string]interface{}{ + "garbage_ratio": metric.GarbageRatio, + "volume_age": metric.Age.String(), + }, + ScheduleAt: time.Now(), + } + results = append(results, result) + } + } + + return results, nil +} + +// vacuumScheduling implements the scheduling logic for vacuum tasks +func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + vacuumConfig := config.(*VacuumConfigV2) + + // Count running vacuum tasks + runningVacuumCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeVacuum { + runningVacuumCount++ + } + } + + // Check concurrency limit + if runningVacuumCount >= vacuumConfig.MaxConcurrent { + return false + } + + // Check for available workers with vacuum capability + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeVacuum { + return true + } + } + } + } + + return false +} + +// createVacuumTask creates a vacuum task instance +func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) { + // Validate parameters + if params.VolumeID == 0 { + return nil, fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return nil, fmt.Errorf("server is required") + } + + // Use existing vacuum task implementation + task := NewTask(params.Server, params.VolumeID) + task.SetEstimatedDuration(task.EstimateTime(params)) + return task, nil +} + +// getVacuumConfigSpec returns the configuration schema for vacuum tasks +func getVacuumConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Vacuum Tasks", + Description: "Whether vacuum tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic vacuum task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "garbage_threshold", + JSONName: "garbage_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.3, + MinValue: 0.0, + MaxValue: 1.0, + Required: true, + DisplayName: "Garbage Percentage Threshold", + Description: "Trigger vacuum when garbage ratio exceeds this percentage", + HelpText: "Volumes with more deleted content than this threshold will be vacuumed", + Placeholder: "0.30 (30%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 2 * 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing vacuum", + HelpText: "The system will check for volumes that need vacuuming at this interval", + Placeholder: "2", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 10, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of vacuum tasks that can run simultaneously", + HelpText: "Limits the number of vacuum operations running at the same time to control system load", + Placeholder: "2 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_volume_age_seconds", + JSONName: "min_volume_age_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 24 * 60 * 60, + MinValue: 1 * 60 * 60, + MaxValue: 7 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Volume Age", + Description: "Only vacuum volumes older than this duration", + HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", + Placeholder: "24", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "min_interval_seconds", + JSONName: "min_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 7 * 24 * 60 * 60, + MinValue: 1 * 24 * 60 * 60, + MaxValue: 30 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Interval", + Description: "Minimum time between vacuum operations on the same volume", + HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", + Placeholder: "7", + Unit: config.UnitDays, + InputType: "interval", + CSSClasses: "form-control", + }, + }, + } +} + +// initVacuumV2 registers the refactored vacuum task (replaces the old registration) +func initVacuumV2() { + // Create configuration instance + config := &VacuumConfigV2{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 2 * 60 * 60, // 2 hours + MaxConcurrent: 2, + }, + GarbageThreshold: 0.3, // 30% + MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + } + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeVacuum, + Name: "vacuum", + DisplayName: "Volume Vacuum", + Description: "Reclaims disk space by removing deleted files from volumes", + Icon: "fas fa-broom text-primary", + Capabilities: []string{"vacuum", "storage"}, + + Config: config, + ConfigSpec: getVacuumConfigSpec(), + CreateTask: createVacuumTask, + DetectionFunc: vacuumDetection, + ScanInterval: 2 * time.Hour, + SchedulingFunc: vacuumScheduling, + MaxConcurrent: 2, + RepeatInterval: 7 * 24 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) +} diff --git a/weed/worker/tasks/vacuum_v2/vacuum.go b/weed/worker/tasks/vacuum_v2/vacuum.go new file mode 100644 index 000000000..3034d07d0 --- /dev/null +++ b/weed/worker/tasks/vacuum_v2/vacuum.go @@ -0,0 +1,260 @@ +package vacuum_v2 + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// VacuumConfig extends BaseConfig with vacuum-specific settings +type VacuumConfig struct { + base.BaseConfig + GarbageThreshold float64 `json:"garbage_threshold"` + MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` + MinIntervalSeconds int `json:"min_interval_seconds"` +} + +// ToMap converts config to map (extend base functionality) +func (c *VacuumConfig) ToMap() map[string]interface{} { + result := c.BaseConfig.ToMap() + result["garbage_threshold"] = c.GarbageThreshold + result["min_volume_age_seconds"] = c.MinVolumeAgeSeconds + result["min_interval_seconds"] = c.MinIntervalSeconds + return result +} + +// FromMap loads config from map (extend base functionality) +func (c *VacuumConfig) FromMap(data map[string]interface{}) error { + // Load base config first + if err := c.BaseConfig.FromMap(data); err != nil { + return err + } + + // Load vacuum-specific config + if threshold, ok := data["garbage_threshold"].(float64); ok { + c.GarbageThreshold = threshold + } + if ageSeconds, ok := data["min_volume_age_seconds"].(int); ok { + c.MinVolumeAgeSeconds = ageSeconds + } + if intervalSeconds, ok := data["min_interval_seconds"].(int); ok { + c.MinIntervalSeconds = intervalSeconds + } + return nil +} + +// vacuumDetection implements the detection logic for vacuum tasks +func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + vacuumConfig := config.(*VacuumConfig) + var results []*types.TaskDetectionResult + minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second + + for _, metric := range metrics { + // Check if volume needs vacuum + if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { + priority := types.TaskPriorityNormal + if metric.GarbageRatio > 0.6 { + priority = types.TaskPriorityHigh + } + + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeVacuum, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: priority, + Reason: "Volume has excessive garbage requiring vacuum", + Parameters: map[string]interface{}{ + "garbage_ratio": metric.GarbageRatio, + "volume_age": metric.Age.String(), + }, + ScheduleAt: time.Now(), + } + results = append(results, result) + } + } + + return results, nil +} + +// vacuumScheduling implements the scheduling logic for vacuum tasks +func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + vacuumConfig := config.(*VacuumConfig) + + // Count running vacuum tasks + runningVacuumCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeVacuum { + runningVacuumCount++ + } + } + + // Check concurrency limit + if runningVacuumCount >= vacuumConfig.MaxConcurrent { + return false + } + + // Check for available workers with vacuum capability + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeVacuum { + return true + } + } + } + } + + return false +} + +// createVacuumTask creates a vacuum task instance +func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) { + // Validate parameters + if params.VolumeID == 0 { + return nil, fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return nil, fmt.Errorf("server is required") + } + + // Reuse existing vacuum task implementation + task := vacuum.NewTask(params.Server, params.VolumeID) + task.SetEstimatedDuration(task.EstimateTime(params)) + return task, nil +} + +// getConfigSpec returns the configuration schema for vacuum tasks +func getConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Vacuum Tasks", + Description: "Whether vacuum tasks should be automatically created", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "garbage_threshold", + JSONName: "garbage_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.3, + MinValue: 0.0, + MaxValue: 1.0, + Required: true, + DisplayName: "Garbage Percentage Threshold", + Description: "Trigger vacuum when garbage ratio exceeds this percentage", + Placeholder: "0.30 (30%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 2 * 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing vacuum", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 10, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of vacuum tasks that can run simultaneously", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_volume_age_seconds", + JSONName: "min_volume_age_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 24 * 60 * 60, + MinValue: 1 * 60 * 60, + MaxValue: 7 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Volume Age", + Description: "Only vacuum volumes older than this duration", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "min_interval_seconds", + JSONName: "min_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 7 * 24 * 60 * 60, + MinValue: 1 * 24 * 60 * 60, + MaxValue: 30 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Interval", + Description: "Minimum time between vacuum operations on the same volume", + Unit: config.UnitDays, + InputType: "interval", + CSSClasses: "form-control", + }, + }, + } +} + +// RegisterVacuumV2 registers the refactored vacuum task +func init() { + // Create configuration instance + config := &VacuumConfig{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 2 * 60 * 60, // 2 hours + MaxConcurrent: 2, + }, + GarbageThreshold: 0.3, // 30% + MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + } + + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeVacuum, + Name: "vacuum_v2", + DisplayName: "Volume Vacuum (V2)", + Description: "Reclaims disk space by removing deleted files from volumes", + Icon: "fas fa-broom text-primary", + Capabilities: []string{"vacuum", "storage"}, + + Config: config, + ConfigSpec: getConfigSpec(), + CreateTask: createVacuumTask, + DetectionFunc: vacuumDetection, + ScanInterval: 2 * time.Hour, + SchedulingFunc: vacuumScheduling, + MaxConcurrent: 2, + RepeatInterval: 7 * 24 * time.Hour, + } + + // Register everything with a single function call! + base.RegisterTask(taskDef) +} |
