aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/base/generic_components.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/base/generic_components.go')
-rw-r--r--weed/worker/tasks/base/generic_components.go129
1 files changed, 129 insertions, 0 deletions
diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go
new file mode 100644
index 000000000..27ad1bb29
--- /dev/null
+++ b/weed/worker/tasks/base/generic_components.go
@@ -0,0 +1,129 @@
+package base
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericDetector implements TaskDetector using function-based logic
+type GenericDetector struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericDetector creates a detector from a task definition
+func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
+ return &GenericDetector{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (d *GenericDetector) GetTaskType() types.TaskType {
+ return d.taskDef.Type
+}
+
+// ScanForTasks scans using the task definition's detection function
+func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
+ if d.taskDef.DetectionFunc == nil {
+ return nil, nil
+ }
+ return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
+}
+
+// ScanInterval returns the scan interval from task definition
+func (d *GenericDetector) ScanInterval() time.Duration {
+ if d.taskDef.ScanInterval > 0 {
+ return d.taskDef.ScanInterval
+ }
+ return 30 * time.Minute // Default
+}
+
+// IsEnabled returns whether this detector is enabled
+func (d *GenericDetector) IsEnabled() bool {
+ return d.taskDef.Config.IsEnabled()
+}
+
+// GenericScheduler implements TaskScheduler using function-based logic
+type GenericScheduler struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericScheduler creates a scheduler from a task definition
+func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
+ return &GenericScheduler{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (s *GenericScheduler) GetTaskType() types.TaskType {
+ return s.taskDef.Type
+}
+
+// CanScheduleNow determines if a task can be scheduled using the task definition's function
+func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if s.taskDef.SchedulingFunc == nil {
+ return s.defaultCanSchedule(task, runningTasks, availableWorkers)
+ }
+ return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
+}
+
+// defaultCanSchedule provides default scheduling logic
+func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if !s.taskDef.Config.IsEnabled() {
+ return false
+ }
+
+ // Count running tasks of this type
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == s.taskDef.Type {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ maxConcurrent := s.taskDef.MaxConcurrent
+ if maxConcurrent <= 0 {
+ maxConcurrent = 1 // Default
+ }
+ if runningCount >= maxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == s.taskDef.Type {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// GetPriority returns the priority for this task
+func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority {
+ return task.Priority
+}
+
+// GetMaxConcurrent returns max concurrent tasks
+func (s *GenericScheduler) GetMaxConcurrent() int {
+ if s.taskDef.MaxConcurrent > 0 {
+ return s.taskDef.MaxConcurrent
+ }
+ return 1 // Default
+}
+
+// GetDefaultRepeatInterval returns the default repeat interval
+func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
+ if s.taskDef.RepeatInterval > 0 {
+ return s.taskDef.RepeatInterval
+ }
+ return 24 * time.Hour // Default
+}
+
+// IsEnabled returns whether this scheduler is enabled
+func (s *GenericScheduler) IsEnabled() bool {
+ return s.taskDef.Config.IsEnabled()
+}