diff options
Diffstat (limited to 'weed/admin/maintenance/maintenance_queue_test.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_queue_test.go | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_queue_test.go b/weed/admin/maintenance/maintenance_queue_test.go new file mode 100644 index 000000000..2c38471a0 --- /dev/null +++ b/weed/admin/maintenance/maintenance_queue_test.go @@ -0,0 +1,353 @@ +package maintenance + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" +) + +// Test suite for canScheduleTaskNow() function and related scheduling logic +// +// This test suite ensures that: +// 1. The fallback scheduling logic works correctly when no integration is present +// 2. Task concurrency limits are properly enforced per task type +// 3. Different task types don't interfere with each other's concurrency limits +// 4. Custom policies with higher concurrency limits work correctly +// 5. Edge cases (nil tasks, empty task types) are handled gracefully +// 6. Helper functions (GetRunningTaskCount, canExecuteTaskType, etc.) work correctly +// +// Background: The canScheduleTaskNow() function is critical for task assignment. +// It was previously failing due to an overly restrictive integration scheduler, +// so we implemented a temporary fix that bypasses the integration and uses +// fallback logic based on simple concurrency limits per task type. + +func TestCanScheduleTaskNow_FallbackLogic(t *testing.T) { + // Test the current implementation which uses fallback logic + mq := &MaintenanceQueue{ + tasks: make(map[string]*MaintenanceTask), + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: nil, // No policy for default behavior + integration: nil, // No integration to force fallback + } + + task := &MaintenanceTask{ + ID: "test-task-1", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusPending, + } + + // Should return true with fallback logic (no running tasks, default max concurrent = 1) + result := mq.canScheduleTaskNow(task) + if !result { + t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false") + } +} + +func TestCanScheduleTaskNow_FallbackWithRunningTasks(t *testing.T) { + // Test fallback logic when there are already running tasks + mq := &MaintenanceQueue{ + tasks: map[string]*MaintenanceTask{ + "running-task": { + ID: "running-task", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusInProgress, + }, + }, + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: nil, + integration: nil, + } + + task := &MaintenanceTask{ + ID: "test-task-2", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusPending, + } + + // Should return false because max concurrent is 1 and we have 1 running task + result := mq.canScheduleTaskNow(task) + if result { + t.Errorf("Expected canScheduleTaskNow to return false when at capacity, got true") + } +} + +func TestCanScheduleTaskNow_DifferentTaskTypes(t *testing.T) { + // Test that different task types don't interfere with each other + mq := &MaintenanceQueue{ + tasks: map[string]*MaintenanceTask{ + "running-ec-task": { + ID: "running-ec-task", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusInProgress, + }, + }, + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: nil, + integration: nil, + } + + // Test vacuum task when EC task is running + vacuumTask := &MaintenanceTask{ + ID: "vacuum-task", + Type: MaintenanceTaskType("vacuum"), + Status: TaskStatusPending, + } + + // Should return true because vacuum and erasure_coding are different task types + result := mq.canScheduleTaskNow(vacuumTask) + if !result { + t.Errorf("Expected canScheduleTaskNow to return true for different task type, got false") + } + + // Test another EC task when one is already running + ecTask := &MaintenanceTask{ + ID: "ec-task", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusPending, + } + + // Should return false because max concurrent for EC is 1 and we have 1 running + result = mq.canScheduleTaskNow(ecTask) + if result { + t.Errorf("Expected canScheduleTaskNow to return false for same task type at capacity, got true") + } +} + +func TestCanScheduleTaskNow_WithIntegration(t *testing.T) { + // Test with a real MaintenanceIntegration (will use fallback logic in current implementation) + policy := &MaintenancePolicy{ + TaskPolicies: make(map[string]*worker_pb.TaskPolicy), + GlobalMaxConcurrent: 10, + DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds + DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds + } + mq := NewMaintenanceQueue(policy) + + // Create a basic integration (this would normally be more complex) + integration := NewMaintenanceIntegration(mq, policy) + mq.SetIntegration(integration) + + task := &MaintenanceTask{ + ID: "test-task-3", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusPending, + } + + // With our current implementation (fallback logic), this should return true + result := mq.canScheduleTaskNow(task) + if !result { + t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false") + } +} + +func TestGetRunningTaskCount(t *testing.T) { + // Test the helper function used by fallback logic + mq := &MaintenanceQueue{ + tasks: map[string]*MaintenanceTask{ + "task1": { + ID: "task1", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusInProgress, + }, + "task2": { + ID: "task2", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusAssigned, + }, + "task3": { + ID: "task3", + Type: MaintenanceTaskType("vacuum"), + Status: TaskStatusInProgress, + }, + "task4": { + ID: "task4", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusCompleted, + }, + }, + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + } + + // Should count 2 running EC tasks (in_progress + assigned) + ecCount := mq.GetRunningTaskCount(MaintenanceTaskType("erasure_coding")) + if ecCount != 2 { + t.Errorf("Expected 2 running EC tasks, got %d", ecCount) + } + + // Should count 1 running vacuum task + vacuumCount := mq.GetRunningTaskCount(MaintenanceTaskType("vacuum")) + if vacuumCount != 1 { + t.Errorf("Expected 1 running vacuum task, got %d", vacuumCount) + } + + // Should count 0 running balance tasks + balanceCount := mq.GetRunningTaskCount(MaintenanceTaskType("balance")) + if balanceCount != 0 { + t.Errorf("Expected 0 running balance tasks, got %d", balanceCount) + } +} + +func TestCanExecuteTaskType(t *testing.T) { + // Test the fallback logic helper function + mq := &MaintenanceQueue{ + tasks: map[string]*MaintenanceTask{ + "running-task": { + ID: "running-task", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusInProgress, + }, + }, + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: nil, // Will use default max concurrent = 1 + integration: nil, + } + + // Should return false for EC (1 running, max = 1) + result := mq.canExecuteTaskType(MaintenanceTaskType("erasure_coding")) + if result { + t.Errorf("Expected canExecuteTaskType to return false for EC at capacity, got true") + } + + // Should return true for vacuum (0 running, max = 1) + result = mq.canExecuteTaskType(MaintenanceTaskType("vacuum")) + if !result { + t.Errorf("Expected canExecuteTaskType to return true for vacuum, got false") + } +} + +func TestGetMaxConcurrentForTaskType_DefaultBehavior(t *testing.T) { + // Test the default behavior when no policy or integration is set + mq := &MaintenanceQueue{ + tasks: make(map[string]*MaintenanceTask), + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: nil, + integration: nil, + } + + // Should return default value of 1 + maxConcurrent := mq.getMaxConcurrentForTaskType(MaintenanceTaskType("erasure_coding")) + if maxConcurrent != 1 { + t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent) + } + + maxConcurrent = mq.getMaxConcurrentForTaskType(MaintenanceTaskType("vacuum")) + if maxConcurrent != 1 { + t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent) + } +} + +// Test edge cases and error conditions +func TestCanScheduleTaskNow_NilTask(t *testing.T) { + mq := &MaintenanceQueue{ + tasks: make(map[string]*MaintenanceTask), + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: nil, + integration: nil, + } + + // This should panic with a nil task, so we expect and catch the panic + defer func() { + if r := recover(); r == nil { + t.Errorf("Expected canScheduleTaskNow to panic with nil task, but it didn't") + } + }() + + // This should panic + mq.canScheduleTaskNow(nil) +} + +func TestCanScheduleTaskNow_EmptyTaskType(t *testing.T) { + mq := &MaintenanceQueue{ + tasks: make(map[string]*MaintenanceTask), + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: nil, + integration: nil, + } + + task := &MaintenanceTask{ + ID: "empty-type-task", + Type: MaintenanceTaskType(""), // Empty task type + Status: TaskStatusPending, + } + + // Should handle empty task type gracefully + result := mq.canScheduleTaskNow(task) + if !result { + t.Errorf("Expected canScheduleTaskNow to handle empty task type, got false") + } +} + +func TestCanScheduleTaskNow_WithPolicy(t *testing.T) { + // Test with a policy that allows higher concurrency + policy := &MaintenancePolicy{ + TaskPolicies: map[string]*worker_pb.TaskPolicy{ + string(MaintenanceTaskType("erasure_coding")): { + Enabled: true, + MaxConcurrent: 3, + RepeatIntervalSeconds: 60 * 60, // 1 hour + CheckIntervalSeconds: 60 * 60, // 1 hour + }, + string(MaintenanceTaskType("vacuum")): { + Enabled: true, + MaxConcurrent: 2, + RepeatIntervalSeconds: 60 * 60, // 1 hour + CheckIntervalSeconds: 60 * 60, // 1 hour + }, + }, + GlobalMaxConcurrent: 10, + DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds + DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds + } + + mq := &MaintenanceQueue{ + tasks: map[string]*MaintenanceTask{ + "running-task-1": { + ID: "running-task-1", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusInProgress, + }, + "running-task-2": { + ID: "running-task-2", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusAssigned, + }, + }, + pendingTasks: []*MaintenanceTask{}, + workers: make(map[string]*MaintenanceWorker), + policy: policy, + integration: nil, + } + + task := &MaintenanceTask{ + ID: "test-task-policy", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusPending, + } + + // Should return true because we have 2 running EC tasks but max is 3 + result := mq.canScheduleTaskNow(task) + if !result { + t.Errorf("Expected canScheduleTaskNow to return true with policy allowing 3 concurrent, got false") + } + + // Add one more running task to reach the limit + mq.tasks["running-task-3"] = &MaintenanceTask{ + ID: "running-task-3", + Type: MaintenanceTaskType("erasure_coding"), + Status: TaskStatusInProgress, + } + + // Should return false because we now have 3 running EC tasks (at limit) + result = mq.canScheduleTaskNow(task) + if result { + t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true") + } +} |
