aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/base/generic_components.go
blob: 27ad1bb293a5c4606f67bc518d2fac0a7f2b85a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package base

import (
	"time"

	"github.com/seaweedfs/seaweedfs/weed/worker/types"
)

// GenericDetector implements TaskDetector using function-based logic
type GenericDetector struct {
	taskDef *TaskDefinition
}

// NewGenericDetector creates a detector from a task definition
func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
	return &GenericDetector{taskDef: taskDef}
}

// GetTaskType returns the task type
func (d *GenericDetector) GetTaskType() types.TaskType {
	return d.taskDef.Type
}

// ScanForTasks scans using the task definition's detection function
func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
	if d.taskDef.DetectionFunc == nil {
		return nil, nil
	}
	return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
}

// ScanInterval returns the scan interval from task definition
func (d *GenericDetector) ScanInterval() time.Duration {
	if d.taskDef.ScanInterval > 0 {
		return d.taskDef.ScanInterval
	}
	return 30 * time.Minute // Default
}

// IsEnabled returns whether this detector is enabled
func (d *GenericDetector) IsEnabled() bool {
	return d.taskDef.Config.IsEnabled()
}

// GenericScheduler implements TaskScheduler using function-based logic
type GenericScheduler struct {
	taskDef *TaskDefinition
}

// NewGenericScheduler creates a scheduler from a task definition
func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
	return &GenericScheduler{taskDef: taskDef}
}

// GetTaskType returns the task type
func (s *GenericScheduler) GetTaskType() types.TaskType {
	return s.taskDef.Type
}

// CanScheduleNow determines if a task can be scheduled using the task definition's function
func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
	if s.taskDef.SchedulingFunc == nil {
		return s.defaultCanSchedule(task, runningTasks, availableWorkers)
	}
	return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
}

// defaultCanSchedule provides default scheduling logic
func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
	if !s.taskDef.Config.IsEnabled() {
		return false
	}

	// Count running tasks of this type
	runningCount := 0
	for _, runningTask := range runningTasks {
		if runningTask.Type == s.taskDef.Type {
			runningCount++
		}
	}

	// Check concurrency limit
	maxConcurrent := s.taskDef.MaxConcurrent
	if maxConcurrent <= 0 {
		maxConcurrent = 1 // Default
	}
	if runningCount >= maxConcurrent {
		return false
	}

	// Check if we have available workers
	for _, worker := range availableWorkers {
		if worker.CurrentLoad < worker.MaxConcurrent {
			for _, capability := range worker.Capabilities {
				if capability == s.taskDef.Type {
					return true
				}
			}
		}
	}

	return false
}

// GetPriority returns the priority for this task
func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority {
	return task.Priority
}

// GetMaxConcurrent returns max concurrent tasks
func (s *GenericScheduler) GetMaxConcurrent() int {
	if s.taskDef.MaxConcurrent > 0 {
		return s.taskDef.MaxConcurrent
	}
	return 1 // Default
}

// GetDefaultRepeatInterval returns the default repeat interval
func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
	if s.taskDef.RepeatInterval > 0 {
		return s.taskDef.RepeatInterval
	}
	return 24 * time.Hour // Default
}

// IsEnabled returns whether this scheduler is enabled
func (s *GenericScheduler) IsEnabled() bool {
	return s.taskDef.Config.IsEnabled()
}