diff options
Diffstat (limited to 'weed/worker/tasks/balance/balance_scheduler.go')
| -rw-r--r-- | weed/worker/tasks/balance/balance_scheduler.go | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/weed/worker/tasks/balance/balance_scheduler.go b/weed/worker/tasks/balance/balance_scheduler.go new file mode 100644 index 000000000..a8fefe465 --- /dev/null +++ b/weed/worker/tasks/balance/balance_scheduler.go @@ -0,0 +1,197 @@ +package balance + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// BalanceScheduler implements TaskScheduler for balance tasks +type BalanceScheduler struct { + enabled bool + maxConcurrent int + minInterval time.Duration + lastScheduled map[string]time.Time // track when we last scheduled a balance for each task type + minServerCount int + moveDuringOffHours bool + offHoursStart string + offHoursEnd string +} + +// Compile-time interface assertions +var ( + _ types.TaskScheduler = (*BalanceScheduler)(nil) +) + +// NewBalanceScheduler creates a new balance scheduler +func NewBalanceScheduler() *BalanceScheduler { + return &BalanceScheduler{ + enabled: true, + maxConcurrent: 1, // Only run one balance at a time + minInterval: 6 * time.Hour, + lastScheduled: make(map[string]time.Time), + minServerCount: 3, + moveDuringOffHours: true, + offHoursStart: "23:00", + offHoursEnd: "06:00", + } +} + +// GetTaskType returns the task type +func (s *BalanceScheduler) GetTaskType() types.TaskType { + return types.TaskTypeBalance +} + +// CanScheduleNow determines if a balance task can be scheduled +func (s *BalanceScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { + if !s.enabled { + return false + } + + // Count running balance tasks + runningBalanceCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeBalance { + runningBalanceCount++ + } + } + + // Check concurrency limit + if runningBalanceCount >= s.maxConcurrent { + glog.V(3).Infof("⏸️ Balance task blocked: too many running (%d >= %d)", runningBalanceCount, s.maxConcurrent) + return false + } + + // Check minimum interval between balance operations + if lastTime, exists := s.lastScheduled["balance"]; exists { + if time.Since(lastTime) < s.minInterval { + timeLeft := s.minInterval - time.Since(lastTime) + glog.V(3).Infof("⏸️ Balance task blocked: too soon (wait %v)", timeLeft) + return false + } + } + + // Check if we have available workers + availableWorkerCount := 0 + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeBalance { + availableWorkerCount++ + break + } + } + } + + if availableWorkerCount == 0 { + glog.V(3).Infof("⏸️ Balance task blocked: no available workers") + return false + } + + // All checks passed - can schedule + s.lastScheduled["balance"] = time.Now() + glog.V(2).Infof("✅ Balance task can be scheduled (running: %d/%d, workers: %d)", + runningBalanceCount, s.maxConcurrent, availableWorkerCount) + return true +} + +// GetPriority returns the priority for balance tasks +func (s *BalanceScheduler) GetPriority(task *types.Task) types.TaskPriority { + // Balance is typically normal priority - not urgent but important for optimization + return types.TaskPriorityNormal +} + +// GetMaxConcurrent returns the maximum concurrent balance tasks +func (s *BalanceScheduler) GetMaxConcurrent() int { + return s.maxConcurrent +} + +// GetDefaultRepeatInterval returns the default interval to wait before repeating balance tasks +func (s *BalanceScheduler) GetDefaultRepeatInterval() time.Duration { + return s.minInterval +} + +// IsEnabled returns whether the scheduler is enabled +func (s *BalanceScheduler) IsEnabled() bool { + return s.enabled +} + +// SetEnabled sets whether the scheduler is enabled +func (s *BalanceScheduler) SetEnabled(enabled bool) { + s.enabled = enabled + glog.V(1).Infof("🔄 Balance scheduler enabled: %v", enabled) +} + +// SetMaxConcurrent sets the maximum concurrent balance tasks +func (s *BalanceScheduler) SetMaxConcurrent(max int) { + s.maxConcurrent = max + glog.V(1).Infof("🔄 Balance max concurrent set to: %d", max) +} + +// SetMinInterval sets the minimum interval between balance operations +func (s *BalanceScheduler) SetMinInterval(interval time.Duration) { + s.minInterval = interval + glog.V(1).Infof("🔄 Balance minimum interval set to: %v", interval) +} + +// GetLastScheduled returns when we last scheduled this task type +func (s *BalanceScheduler) GetLastScheduled(taskKey string) time.Time { + if lastTime, exists := s.lastScheduled[taskKey]; exists { + return lastTime + } + return time.Time{} +} + +// SetLastScheduled updates when we last scheduled this task type +func (s *BalanceScheduler) SetLastScheduled(taskKey string, when time.Time) { + s.lastScheduled[taskKey] = when +} + +// GetMinServerCount returns the minimum server count +func (s *BalanceScheduler) GetMinServerCount() int { + return s.minServerCount +} + +// SetMinServerCount sets the minimum server count +func (s *BalanceScheduler) SetMinServerCount(count int) { + s.minServerCount = count + glog.V(1).Infof("🔄 Balance minimum server count set to: %d", count) +} + +// GetMoveDuringOffHours returns whether to move only during off-hours +func (s *BalanceScheduler) GetMoveDuringOffHours() bool { + return s.moveDuringOffHours +} + +// SetMoveDuringOffHours sets whether to move only during off-hours +func (s *BalanceScheduler) SetMoveDuringOffHours(enabled bool) { + s.moveDuringOffHours = enabled + glog.V(1).Infof("🔄 Balance move during off-hours: %v", enabled) +} + +// GetOffHoursStart returns the off-hours start time +func (s *BalanceScheduler) GetOffHoursStart() string { + return s.offHoursStart +} + +// SetOffHoursStart sets the off-hours start time +func (s *BalanceScheduler) SetOffHoursStart(start string) { + s.offHoursStart = start + glog.V(1).Infof("🔄 Balance off-hours start time set to: %s", start) +} + +// GetOffHoursEnd returns the off-hours end time +func (s *BalanceScheduler) GetOffHoursEnd() string { + return s.offHoursEnd +} + +// SetOffHoursEnd sets the off-hours end time +func (s *BalanceScheduler) SetOffHoursEnd(end string) { + s.offHoursEnd = end + glog.V(1).Infof("🔄 Balance off-hours end time set to: %s", end) +} + +// GetMinInterval returns the minimum interval +func (s *BalanceScheduler) GetMinInterval() time.Duration { + return s.minInterval +} |
