aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding/ec_scheduler.go
blob: b2366bb06de884f41d7f8425d6ae4bae875c37ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package erasure_coding

import (
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/worker/types"
)

// Scheduler implements erasure coding task scheduling
type Scheduler struct {
	maxConcurrent int
	enabled       bool
}

// NewScheduler creates a new erasure coding scheduler
func NewScheduler() *Scheduler {
	return &Scheduler{
		maxConcurrent: 1,     // Conservative default
		enabled:       false, // Conservative default
	}
}

// GetTaskType returns the task type
func (s *Scheduler) GetTaskType() types.TaskType {
	return types.TaskTypeErasureCoding
}

// CanScheduleNow determines if an erasure coding task can be scheduled now
func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
	if !s.enabled {
		return false
	}

	// Check if we have available workers
	if len(availableWorkers) == 0 {
		return false
	}

	// Count running EC tasks
	runningCount := 0
	for _, runningTask := range runningTasks {
		if runningTask.Type == types.TaskTypeErasureCoding {
			runningCount++
		}
	}

	// Check concurrency limit
	if runningCount >= s.maxConcurrent {
		glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
		return false
	}

	// Check if any worker can handle EC tasks
	for _, worker := range availableWorkers {
		for _, capability := range worker.Capabilities {
			if capability == types.TaskTypeErasureCoding {
				glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
				return true
			}
		}
	}

	return false
}

// GetMaxConcurrent returns the maximum number of concurrent tasks
func (s *Scheduler) GetMaxConcurrent() int {
	return s.maxConcurrent
}

// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
	return 24 * time.Hour // Don't repeat EC for 24 hours
}

// GetPriority returns the priority for this task
func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
	return types.TaskPriorityLow // EC is not urgent
}

// WasTaskRecentlyCompleted checks if a similar task was recently completed
func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
	// Don't repeat EC for 24 hours
	interval := 24 * time.Hour
	cutoff := now.Add(-interval)

	for _, completedTask := range completedTasks {
		if completedTask.Type == types.TaskTypeErasureCoding &&
			completedTask.VolumeID == task.VolumeID &&
			completedTask.Server == task.Server &&
			completedTask.Status == types.TaskStatusCompleted &&
			completedTask.CompletedAt != nil &&
			completedTask.CompletedAt.After(cutoff) {
			return true
		}
	}
	return false
}

// IsEnabled returns whether this task type is enabled
func (s *Scheduler) IsEnabled() bool {
	return s.enabled
}

// Configuration setters

func (s *Scheduler) SetEnabled(enabled bool) {
	s.enabled = enabled
}

func (s *Scheduler) SetMaxConcurrent(max int) {
	s.maxConcurrent = max
}