diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-30 12:38:03 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-30 12:38:03 -0700 |
| commit | 891a2fb6ebc324329f5330a140b8cacff3899db4 (patch) | |
| tree | d02aaa80a909e958aea831f206b3240b0237d7b7 /weed/admin/maintenance/maintenance_queue.go | |
| parent | 64198dad8346fe284cbef944fe01ff0d062c147d (diff) | |
| download | seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.tar.xz seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.zip | |
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design
* added simulation as tests
* reorganized the codebase to move the simulation framework and tests into their own dedicated package
* integration test. ec worker task
* remove "enhanced" reference
* start master, volume servers, filer
Current Status
✅ Master: Healthy and running (port 9333)
✅ Filer: Healthy and running (port 8888)
✅ Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready
* generate write load
* tasks are assigned
* admin start wtih grpc port. worker has its own working directory
* Update .gitignore
* working worker and admin. Task detection is not working yet.
* compiles, detection uses volumeSizeLimitMB from master
* compiles
* worker retries connecting to admin
* build and restart
* rendering pending tasks
* skip task ID column
* sticky worker id
* test canScheduleTaskNow
* worker reconnect to admin
* clean up logs
* worker register itself first
* worker can run ec work and report status
but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.
* move ec task logic
* listing ec shards
* local copy, ec. Need to distribute.
* ec is mostly working now
* distribution of ec shards needs improvement
* need configuration to enable ec
* show ec volumes
* interval field UI component
* rename
* integration test with vauuming
* garbage percentage threshold
* fix warning
* display ec shard sizes
* fix ec volumes list
* Update ui.go
* show default values
* ensure correct default value
* MaintenanceConfig use ConfigField
* use schema defined defaults
* config
* reduce duplication
* refactor to use BaseUIProvider
* each task register its schema
* checkECEncodingCandidate use ecDetector
* use vacuumDetector
* use volumeSizeLimitMB
* remove
remove
* remove unused
* refactor
* use new framework
* remove v2 reference
* refactor
* left menu can scroll now
* The maintenance manager was not being initialized when no data directory was configured for persistent storage.
* saving config
* Update task_config_schema_templ.go
* enable/disable tasks
* protobuf encoded task configurations
* fix system settings
* use ui component
* remove logs
* interface{} Reduction
* reduce interface{}
* reduce interface{}
* avoid from/to map
* reduce interface{}
* refactor
* keep it DRY
* added logging
* debug messages
* debug level
* debug
* show the log caller line
* use configured task policy
* log level
* handle admin heartbeat response
* Update worker.go
* fix EC rack and dc count
* Report task status to admin server
* fix task logging, simplify interface checking, use erasure_coding constants
* factor in empty volume server during task planning
* volume.list adds disk id
* track disk id also
* fix locking scheduled and manual scanning
* add active topology
* simplify task detector
* ec task completed, but shards are not showing up
* implement ec in ec_typed.go
* adjust log level
* dedup
* implementing ec copying shards and only ecx files
* use disk id when distributing ec shards
🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned
* Delete original volume from all locations
* clean up existing shard locations
* local encoding and distributing
* Update docker/admin_integration/EC-TESTING-README.md
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* check volume id range
* simplify
* fix tests
* fix types
* clean up logs and tests
---------
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/admin/maintenance/maintenance_queue.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_queue.go | 352 |
1 files changed, 305 insertions, 47 deletions
diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 580a98718..ca402bd4d 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -1,10 +1,13 @@ package maintenance import ( + "crypto/rand" + "fmt" "sort" "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" ) // NewMaintenanceQueue creates a new maintenance queue @@ -24,11 +27,18 @@ func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration) glog.V(1).Infof("Maintenance queue configured with integration") } -// AddTask adds a new maintenance task to the queue +// AddTask adds a new maintenance task to the queue with deduplication func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { mq.mutex.Lock() defer mq.mutex.Unlock() + // Check for duplicate tasks (same type + volume + not completed) + if mq.hasDuplicateTask(task) { + glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)", + task.Type, task.VolumeID, task.Server) + return + } + task.ID = generateTaskID() task.Status = TaskStatusPending task.CreatedAt = time.Now() @@ -45,19 +55,48 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) }) - glog.V(2).Infof("Added maintenance task %s: %s for volume %d", task.ID, task.Type, task.VolumeID) + scheduleInfo := "" + if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute { + scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05")) + } + + glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s", + task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason) +} + +// hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed) +func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool { + for _, existingTask := range mq.tasks { + if existingTask.Type == newTask.Type && + existingTask.VolumeID == newTask.VolumeID && + existingTask.Server == newTask.Server && + (existingTask.Status == TaskStatusPending || + existingTask.Status == TaskStatusAssigned || + existingTask.Status == TaskStatusInProgress) { + return true + } + } + return false } // AddTasksFromResults converts detection results to tasks and adds them to the queue func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) { for _, result := range results { + // Validate that task has proper typed parameters + if result.TypedParams == nil { + glog.Warningf("Rejecting invalid task: %s for volume %d on %s - no typed parameters (insufficient destinations or planning failed)", + result.TaskType, result.VolumeID, result.Server) + continue + } + task := &MaintenanceTask{ - Type: result.TaskType, - Priority: result.Priority, - VolumeID: result.VolumeID, - Server: result.Server, - Collection: result.Collection, - Parameters: result.Parameters, + Type: result.TaskType, + Priority: result.Priority, + VolumeID: result.VolumeID, + Server: result.Server, + Collection: result.Collection, + // Copy typed protobuf parameters + TypedParams: result.TypedParams, Reason: result.Reason, ScheduledAt: result.ScheduleAt, } @@ -67,57 +106,92 @@ func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) // GetNextTask returns the next available task for a worker func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask { - mq.mutex.Lock() - defer mq.mutex.Unlock() + // Use read lock for initial checks and search + mq.mutex.RLock() worker, exists := mq.workers[workerID] if !exists { + mq.mutex.RUnlock() + glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID) return nil } // Check if worker has capacity if worker.CurrentLoad >= worker.MaxConcurrent { + mq.mutex.RUnlock() + glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent) return nil } now := time.Now() + var selectedTask *MaintenanceTask + var selectedIndex int = -1 - // Find the next suitable task + // Find the next suitable task (using read lock) for i, task := range mq.pendingTasks { // Check if it's time to execute the task if task.ScheduledAt.After(now) { + glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt) continue } // Check if worker can handle this task type if !mq.workerCanHandle(task.Type, capabilities) { + glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities) continue } - // Check scheduling logic - use simplified system if available, otherwise fallback + // Check if this task type needs a cooldown period if !mq.canScheduleTaskNow(task) { + glog.V(3).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met", task.ID, task.Type, workerID) continue } - // Assign task to worker - task.Status = TaskStatusAssigned - task.WorkerID = workerID - startTime := now - task.StartedAt = &startTime + // Found a suitable task + selectedTask = task + selectedIndex = i + break + } - // Remove from pending tasks - mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...) + // Release read lock + mq.mutex.RUnlock() - // Update worker - worker.CurrentTask = task - worker.CurrentLoad++ - worker.Status = "busy" + // If no task found, return nil + if selectedTask == nil { + glog.V(2).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks)) + return nil + } + + // Now acquire write lock to actually assign the task + mq.mutex.Lock() + defer mq.mutex.Unlock() + + // Re-check that the task is still available (it might have been assigned to another worker) + if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID { + glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID) + return nil + } + + // Assign the task + selectedTask.Status = TaskStatusAssigned + selectedTask.WorkerID = workerID + selectedTask.StartedAt = &now + + // Remove from pending tasks + mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...) - glog.V(2).Infof("Assigned task %s to worker %s", task.ID, workerID) - return task + // Update worker load + if worker, exists := mq.workers[workerID]; exists { + worker.CurrentLoad++ } - return nil + // Track pending operation + mq.trackPendingOperation(selectedTask) + + glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)", + selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server) + + return selectedTask } // CompleteTask marks a task as completed @@ -127,12 +201,19 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task, exists := mq.tasks[taskID] if !exists { + glog.Warningf("Attempted to complete non-existent task: %s", taskID) return } completedTime := time.Now() task.CompletedAt = &completedTime + // Calculate task duration + var duration time.Duration + if task.StartedAt != nil { + duration = completedTime.Sub(*task.StartedAt) + } + if error != "" { task.Status = TaskStatusFailed task.Error = error @@ -148,14 +229,17 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay mq.pendingTasks = append(mq.pendingTasks, task) - glog.V(2).Infof("Retrying task %s (attempt %d/%d)", taskID, task.RetryCount, task.MaxRetries) + glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s", + taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error) } else { - glog.Errorf("Task %s failed permanently after %d retries: %s", taskID, task.MaxRetries, error) + glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s", + taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error) } } else { task.Status = TaskStatusCompleted task.Progress = 100 - glog.V(2).Infof("Task %s completed successfully", taskID) + glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d", + taskID, task.Type, task.WorkerID, duration, task.VolumeID) } // Update worker @@ -168,6 +252,11 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } } + + // Remove pending operation (unless it's being retried) + if task.Status != TaskStatusPending { + mq.removePendingOperation(taskID) + } } // UpdateTaskProgress updates the progress of a running task @@ -176,8 +265,26 @@ func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) defer mq.mutex.RUnlock() if task, exists := mq.tasks[taskID]; exists { + oldProgress := task.Progress task.Progress = progress task.Status = TaskStatusInProgress + + // Update pending operation status + mq.updatePendingOperationStatus(taskID, "in_progress") + + // Log progress at significant milestones or changes + if progress == 0 { + glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d", + taskID, task.Type, task.WorkerID, task.VolumeID) + } else if progress >= 100 { + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, task.Type, task.WorkerID, progress) + } else if progress-oldProgress >= 25 { // Log every 25% increment + glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete", + taskID, task.Type, task.WorkerID, progress) + } + } else { + glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress) } } @@ -186,12 +293,25 @@ func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) { mq.mutex.Lock() defer mq.mutex.Unlock() + isNewWorker := true + if existingWorker, exists := mq.workers[worker.ID]; exists { + isNewWorker = false + glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)", + worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent) + + // Preserve current load when reconnecting + worker.CurrentLoad = existingWorker.CurrentLoad + } else { + glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)", + worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent) + } + worker.LastHeartbeat = time.Now() worker.Status = "active" - worker.CurrentLoad = 0 + if isNewWorker { + worker.CurrentLoad = 0 + } mq.workers[worker.ID] = worker - - glog.V(1).Infof("Registered maintenance worker %s at %s", worker.ID, worker.Address) } // UpdateWorkerHeartbeat updates worker heartbeat @@ -200,7 +320,15 @@ func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) { defer mq.mutex.Unlock() if worker, exists := mq.workers[workerID]; exists { + lastSeen := worker.LastHeartbeat worker.LastHeartbeat = time.Now() + + // Log if worker was offline for a while + if time.Since(lastSeen) > 2*time.Minute { + glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen)) + } + } else { + glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID) } } @@ -255,7 +383,7 @@ func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTask // Fallback to policy configuration if no scheduler available or scheduler doesn't provide default if mq.policy != nil { - repeatIntervalHours := mq.policy.GetRepeatInterval(taskType) + repeatIntervalHours := GetRepeatInterval(mq.policy, taskType) if repeatIntervalHours > 0 { interval := time.Duration(repeatIntervalHours) * time.Hour glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval) @@ -311,10 +439,23 @@ func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker { func generateTaskID() string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" b := make([]byte, 8) + randBytes := make([]byte, 8) + + // Generate random bytes + if _, err := rand.Read(randBytes); err != nil { + // Fallback to timestamp-based ID if crypto/rand fails + timestamp := time.Now().UnixNano() + return fmt.Sprintf("task-%d", timestamp) + } + + // Convert random bytes to charset for i := range b { - b[i] = charset[i%len(charset)] + b[i] = charset[int(randBytes[i])%len(charset)] } - return string(b) + + // Add timestamp suffix to ensure uniqueness + timestamp := time.Now().Unix() % 10000 // last 4 digits of timestamp + return fmt.Sprintf("%s-%04d", string(b), timestamp) } // CleanupOldTasks removes old completed and failed tasks @@ -427,19 +568,31 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi // canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool { - // Try task scheduling logic first - if mq.integration != nil { - // Get all running tasks and available workers - runningTasks := mq.getRunningTasks() - availableWorkers := mq.getAvailableWorkers() + glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type) - canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers) - glog.V(3).Infof("Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule) - return canSchedule - } + // TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive + // Use fallback logic directly for now + glog.V(2).Infof("Using fallback logic for task scheduling") + canExecute := mq.canExecuteTaskType(task.Type) + glog.V(2).Infof("Fallback decision for task %s: %v", task.ID, canExecute) + return canExecute - // Fallback to hardcoded logic - return mq.canExecuteTaskType(task.Type) + // NOTE: Original integration code disabled temporarily + // Try task scheduling logic first + /* + if mq.integration != nil { + glog.Infof("DEBUG canScheduleTaskNow: Using integration task scheduler") + // Get all running tasks and available workers + runningTasks := mq.getRunningTasks() + availableWorkers := mq.getAvailableWorkers() + + glog.Infof("DEBUG canScheduleTaskNow: Running tasks: %d, Available workers: %d", len(runningTasks), len(availableWorkers)) + + canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers) + glog.Infof("DEBUG canScheduleTaskNow: Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule) + return canSchedule + } + */ } // canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic @@ -465,7 +618,7 @@ func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTask // Fallback to policy configuration if no scheduler available or scheduler doesn't provide default if mq.policy != nil { - maxConcurrent := mq.policy.GetMaxConcurrent(taskType) + maxConcurrent := GetMaxConcurrent(mq.policy, taskType) if maxConcurrent > 0 { glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent) return maxConcurrent @@ -498,3 +651,108 @@ func (mq *MaintenanceQueue) getAvailableWorkers() []*MaintenanceWorker { } return availableWorkers } + +// trackPendingOperation adds a task to the pending operations tracker +func (mq *MaintenanceQueue) trackPendingOperation(task *MaintenanceTask) { + if mq.integration == nil { + return + } + + pendingOps := mq.integration.GetPendingOperations() + if pendingOps == nil { + return + } + + // Skip tracking for tasks without proper typed parameters + if task.TypedParams == nil { + glog.V(2).Infof("Skipping pending operation tracking for task %s - no typed parameters", task.ID) + return + } + + // Map maintenance task type to pending operation type + var opType PendingOperationType + switch task.Type { + case MaintenanceTaskType("balance"): + opType = OpTypeVolumeBalance + case MaintenanceTaskType("erasure_coding"): + opType = OpTypeErasureCoding + case MaintenanceTaskType("vacuum"): + opType = OpTypeVacuum + case MaintenanceTaskType("replication"): + opType = OpTypeReplication + default: + opType = OpTypeVolumeMove + } + + // Determine destination node and estimated size from typed parameters + destNode := "" + estimatedSize := uint64(1024 * 1024 * 1024) // Default 1GB estimate + + switch params := task.TypedParams.TaskParams.(type) { + case *worker_pb.TaskParams_ErasureCodingParams: + if params.ErasureCodingParams != nil { + if len(params.ErasureCodingParams.Destinations) > 0 { + destNode = params.ErasureCodingParams.Destinations[0].Node + } + if params.ErasureCodingParams.EstimatedShardSize > 0 { + estimatedSize = params.ErasureCodingParams.EstimatedShardSize + } + } + case *worker_pb.TaskParams_BalanceParams: + if params.BalanceParams != nil { + destNode = params.BalanceParams.DestNode + if params.BalanceParams.EstimatedSize > 0 { + estimatedSize = params.BalanceParams.EstimatedSize + } + } + case *worker_pb.TaskParams_ReplicationParams: + if params.ReplicationParams != nil { + destNode = params.ReplicationParams.DestNode + if params.ReplicationParams.EstimatedSize > 0 { + estimatedSize = params.ReplicationParams.EstimatedSize + } + } + } + + operation := &PendingOperation{ + VolumeID: task.VolumeID, + OperationType: opType, + SourceNode: task.Server, + DestNode: destNode, + TaskID: task.ID, + StartTime: time.Now(), + EstimatedSize: estimatedSize, + Collection: task.Collection, + Status: "assigned", + } + + pendingOps.AddOperation(operation) +} + +// removePendingOperation removes a task from the pending operations tracker +func (mq *MaintenanceQueue) removePendingOperation(taskID string) { + if mq.integration == nil { + return + } + + pendingOps := mq.integration.GetPendingOperations() + if pendingOps == nil { + return + } + + pendingOps.RemoveOperation(taskID) +} + +// updatePendingOperationStatus updates the status of a pending operation +func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status string) { + if mq.integration == nil { + return + } + + pendingOps := mq.integration.GetPendingOperations() + if pendingOps == nil { + return + } + + pendingOps.UpdateOperationStatus(taskID, status) +} |
