aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_queue.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-30 12:38:03 -0700
committerGitHub <noreply@github.com>2025-07-30 12:38:03 -0700
commit891a2fb6ebc324329f5330a140b8cacff3899db4 (patch)
treed02aaa80a909e958aea831f206b3240b0237d7b7 /weed/admin/maintenance/maintenance_queue.go
parent64198dad8346fe284cbef944fe01ff0d062c147d (diff)
downloadseaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.tar.xz
seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.zip
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/admin/maintenance/maintenance_queue.go')
-rw-r--r--weed/admin/maintenance/maintenance_queue.go352
1 files changed, 305 insertions, 47 deletions
diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go
index 580a98718..ca402bd4d 100644
--- a/weed/admin/maintenance/maintenance_queue.go
+++ b/weed/admin/maintenance/maintenance_queue.go
@@ -1,10 +1,13 @@
package maintenance
import (
+ "crypto/rand"
+ "fmt"
"sort"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
// NewMaintenanceQueue creates a new maintenance queue
@@ -24,11 +27,18 @@ func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration)
glog.V(1).Infof("Maintenance queue configured with integration")
}
-// AddTask adds a new maintenance task to the queue
+// AddTask adds a new maintenance task to the queue with deduplication
func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
+ // Check for duplicate tasks (same type + volume + not completed)
+ if mq.hasDuplicateTask(task) {
+ glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)",
+ task.Type, task.VolumeID, task.Server)
+ return
+ }
+
task.ID = generateTaskID()
task.Status = TaskStatusPending
task.CreatedAt = time.Now()
@@ -45,19 +55,48 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
})
- glog.V(2).Infof("Added maintenance task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
+ scheduleInfo := ""
+ if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute {
+ scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05"))
+ }
+
+ glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s",
+ task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason)
+}
+
+// hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed)
+func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool {
+ for _, existingTask := range mq.tasks {
+ if existingTask.Type == newTask.Type &&
+ existingTask.VolumeID == newTask.VolumeID &&
+ existingTask.Server == newTask.Server &&
+ (existingTask.Status == TaskStatusPending ||
+ existingTask.Status == TaskStatusAssigned ||
+ existingTask.Status == TaskStatusInProgress) {
+ return true
+ }
+ }
+ return false
}
// AddTasksFromResults converts detection results to tasks and adds them to the queue
func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) {
for _, result := range results {
+ // Validate that task has proper typed parameters
+ if result.TypedParams == nil {
+ glog.Warningf("Rejecting invalid task: %s for volume %d on %s - no typed parameters (insufficient destinations or planning failed)",
+ result.TaskType, result.VolumeID, result.Server)
+ continue
+ }
+
task := &MaintenanceTask{
- Type: result.TaskType,
- Priority: result.Priority,
- VolumeID: result.VolumeID,
- Server: result.Server,
- Collection: result.Collection,
- Parameters: result.Parameters,
+ Type: result.TaskType,
+ Priority: result.Priority,
+ VolumeID: result.VolumeID,
+ Server: result.Server,
+ Collection: result.Collection,
+ // Copy typed protobuf parameters
+ TypedParams: result.TypedParams,
Reason: result.Reason,
ScheduledAt: result.ScheduleAt,
}
@@ -67,57 +106,92 @@ func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult)
// GetNextTask returns the next available task for a worker
func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
- mq.mutex.Lock()
- defer mq.mutex.Unlock()
+ // Use read lock for initial checks and search
+ mq.mutex.RLock()
worker, exists := mq.workers[workerID]
if !exists {
+ mq.mutex.RUnlock()
+ glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID)
return nil
}
// Check if worker has capacity
if worker.CurrentLoad >= worker.MaxConcurrent {
+ mq.mutex.RUnlock()
+ glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent)
return nil
}
now := time.Now()
+ var selectedTask *MaintenanceTask
+ var selectedIndex int = -1
- // Find the next suitable task
+ // Find the next suitable task (using read lock)
for i, task := range mq.pendingTasks {
// Check if it's time to execute the task
if task.ScheduledAt.After(now) {
+ glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt)
continue
}
// Check if worker can handle this task type
if !mq.workerCanHandle(task.Type, capabilities) {
+ glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities)
continue
}
- // Check scheduling logic - use simplified system if available, otherwise fallback
+ // Check if this task type needs a cooldown period
if !mq.canScheduleTaskNow(task) {
+ glog.V(3).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met", task.ID, task.Type, workerID)
continue
}
- // Assign task to worker
- task.Status = TaskStatusAssigned
- task.WorkerID = workerID
- startTime := now
- task.StartedAt = &startTime
+ // Found a suitable task
+ selectedTask = task
+ selectedIndex = i
+ break
+ }
- // Remove from pending tasks
- mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...)
+ // Release read lock
+ mq.mutex.RUnlock()
- // Update worker
- worker.CurrentTask = task
- worker.CurrentLoad++
- worker.Status = "busy"
+ // If no task found, return nil
+ if selectedTask == nil {
+ glog.V(2).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks))
+ return nil
+ }
+
+ // Now acquire write lock to actually assign the task
+ mq.mutex.Lock()
+ defer mq.mutex.Unlock()
+
+ // Re-check that the task is still available (it might have been assigned to another worker)
+ if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID {
+ glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID)
+ return nil
+ }
+
+ // Assign the task
+ selectedTask.Status = TaskStatusAssigned
+ selectedTask.WorkerID = workerID
+ selectedTask.StartedAt = &now
+
+ // Remove from pending tasks
+ mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...)
- glog.V(2).Infof("Assigned task %s to worker %s", task.ID, workerID)
- return task
+ // Update worker load
+ if worker, exists := mq.workers[workerID]; exists {
+ worker.CurrentLoad++
}
- return nil
+ // Track pending operation
+ mq.trackPendingOperation(selectedTask)
+
+ glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)",
+ selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server)
+
+ return selectedTask
}
// CompleteTask marks a task as completed
@@ -127,12 +201,19 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
task, exists := mq.tasks[taskID]
if !exists {
+ glog.Warningf("Attempted to complete non-existent task: %s", taskID)
return
}
completedTime := time.Now()
task.CompletedAt = &completedTime
+ // Calculate task duration
+ var duration time.Duration
+ if task.StartedAt != nil {
+ duration = completedTime.Sub(*task.StartedAt)
+ }
+
if error != "" {
task.Status = TaskStatusFailed
task.Error = error
@@ -148,14 +229,17 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay
mq.pendingTasks = append(mq.pendingTasks, task)
- glog.V(2).Infof("Retrying task %s (attempt %d/%d)", taskID, task.RetryCount, task.MaxRetries)
+ glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s",
+ taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error)
} else {
- glog.Errorf("Task %s failed permanently after %d retries: %s", taskID, task.MaxRetries, error)
+ glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s",
+ taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error)
}
} else {
task.Status = TaskStatusCompleted
task.Progress = 100
- glog.V(2).Infof("Task %s completed successfully", taskID)
+ glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d",
+ taskID, task.Type, task.WorkerID, duration, task.VolumeID)
}
// Update worker
@@ -168,6 +252,11 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
}
}
}
+
+ // Remove pending operation (unless it's being retried)
+ if task.Status != TaskStatusPending {
+ mq.removePendingOperation(taskID)
+ }
}
// UpdateTaskProgress updates the progress of a running task
@@ -176,8 +265,26 @@ func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64)
defer mq.mutex.RUnlock()
if task, exists := mq.tasks[taskID]; exists {
+ oldProgress := task.Progress
task.Progress = progress
task.Status = TaskStatusInProgress
+
+ // Update pending operation status
+ mq.updatePendingOperationStatus(taskID, "in_progress")
+
+ // Log progress at significant milestones or changes
+ if progress == 0 {
+ glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d",
+ taskID, task.Type, task.WorkerID, task.VolumeID)
+ } else if progress >= 100 {
+ glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
+ taskID, task.Type, task.WorkerID, progress)
+ } else if progress-oldProgress >= 25 { // Log every 25% increment
+ glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
+ taskID, task.Type, task.WorkerID, progress)
+ }
+ } else {
+ glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress)
}
}
@@ -186,12 +293,25 @@ func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
+ isNewWorker := true
+ if existingWorker, exists := mq.workers[worker.ID]; exists {
+ isNewWorker = false
+ glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)",
+ worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
+
+ // Preserve current load when reconnecting
+ worker.CurrentLoad = existingWorker.CurrentLoad
+ } else {
+ glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)",
+ worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
+ }
+
worker.LastHeartbeat = time.Now()
worker.Status = "active"
- worker.CurrentLoad = 0
+ if isNewWorker {
+ worker.CurrentLoad = 0
+ }
mq.workers[worker.ID] = worker
-
- glog.V(1).Infof("Registered maintenance worker %s at %s", worker.ID, worker.Address)
}
// UpdateWorkerHeartbeat updates worker heartbeat
@@ -200,7 +320,15 @@ func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) {
defer mq.mutex.Unlock()
if worker, exists := mq.workers[workerID]; exists {
+ lastSeen := worker.LastHeartbeat
worker.LastHeartbeat = time.Now()
+
+ // Log if worker was offline for a while
+ if time.Since(lastSeen) > 2*time.Minute {
+ glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen))
+ }
+ } else {
+ glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID)
}
}
@@ -255,7 +383,7 @@ func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTask
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
- repeatIntervalHours := mq.policy.GetRepeatInterval(taskType)
+ repeatIntervalHours := GetRepeatInterval(mq.policy, taskType)
if repeatIntervalHours > 0 {
interval := time.Duration(repeatIntervalHours) * time.Hour
glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval)
@@ -311,10 +439,23 @@ func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker {
func generateTaskID() string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, 8)
+ randBytes := make([]byte, 8)
+
+ // Generate random bytes
+ if _, err := rand.Read(randBytes); err != nil {
+ // Fallback to timestamp-based ID if crypto/rand fails
+ timestamp := time.Now().UnixNano()
+ return fmt.Sprintf("task-%d", timestamp)
+ }
+
+ // Convert random bytes to charset
for i := range b {
- b[i] = charset[i%len(charset)]
+ b[i] = charset[int(randBytes[i])%len(charset)]
}
- return string(b)
+
+ // Add timestamp suffix to ensure uniqueness
+ timestamp := time.Now().Unix() % 10000 // last 4 digits of timestamp
+ return fmt.Sprintf("%s-%04d", string(b), timestamp)
}
// CleanupOldTasks removes old completed and failed tasks
@@ -427,19 +568,31 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
- // Try task scheduling logic first
- if mq.integration != nil {
- // Get all running tasks and available workers
- runningTasks := mq.getRunningTasks()
- availableWorkers := mq.getAvailableWorkers()
+ glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
- canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
- glog.V(3).Infof("Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
- return canSchedule
- }
+ // TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive
+ // Use fallback logic directly for now
+ glog.V(2).Infof("Using fallback logic for task scheduling")
+ canExecute := mq.canExecuteTaskType(task.Type)
+ glog.V(2).Infof("Fallback decision for task %s: %v", task.ID, canExecute)
+ return canExecute
- // Fallback to hardcoded logic
- return mq.canExecuteTaskType(task.Type)
+ // NOTE: Original integration code disabled temporarily
+ // Try task scheduling logic first
+ /*
+ if mq.integration != nil {
+ glog.Infof("DEBUG canScheduleTaskNow: Using integration task scheduler")
+ // Get all running tasks and available workers
+ runningTasks := mq.getRunningTasks()
+ availableWorkers := mq.getAvailableWorkers()
+
+ glog.Infof("DEBUG canScheduleTaskNow: Running tasks: %d, Available workers: %d", len(runningTasks), len(availableWorkers))
+
+ canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
+ glog.Infof("DEBUG canScheduleTaskNow: Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
+ return canSchedule
+ }
+ */
}
// canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic
@@ -465,7 +618,7 @@ func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTask
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
- maxConcurrent := mq.policy.GetMaxConcurrent(taskType)
+ maxConcurrent := GetMaxConcurrent(mq.policy, taskType)
if maxConcurrent > 0 {
glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent)
return maxConcurrent
@@ -498,3 +651,108 @@ func (mq *MaintenanceQueue) getAvailableWorkers() []*MaintenanceWorker {
}
return availableWorkers
}
+
+// trackPendingOperation adds a task to the pending operations tracker
+func (mq *MaintenanceQueue) trackPendingOperation(task *MaintenanceTask) {
+ if mq.integration == nil {
+ return
+ }
+
+ pendingOps := mq.integration.GetPendingOperations()
+ if pendingOps == nil {
+ return
+ }
+
+ // Skip tracking for tasks without proper typed parameters
+ if task.TypedParams == nil {
+ glog.V(2).Infof("Skipping pending operation tracking for task %s - no typed parameters", task.ID)
+ return
+ }
+
+ // Map maintenance task type to pending operation type
+ var opType PendingOperationType
+ switch task.Type {
+ case MaintenanceTaskType("balance"):
+ opType = OpTypeVolumeBalance
+ case MaintenanceTaskType("erasure_coding"):
+ opType = OpTypeErasureCoding
+ case MaintenanceTaskType("vacuum"):
+ opType = OpTypeVacuum
+ case MaintenanceTaskType("replication"):
+ opType = OpTypeReplication
+ default:
+ opType = OpTypeVolumeMove
+ }
+
+ // Determine destination node and estimated size from typed parameters
+ destNode := ""
+ estimatedSize := uint64(1024 * 1024 * 1024) // Default 1GB estimate
+
+ switch params := task.TypedParams.TaskParams.(type) {
+ case *worker_pb.TaskParams_ErasureCodingParams:
+ if params.ErasureCodingParams != nil {
+ if len(params.ErasureCodingParams.Destinations) > 0 {
+ destNode = params.ErasureCodingParams.Destinations[0].Node
+ }
+ if params.ErasureCodingParams.EstimatedShardSize > 0 {
+ estimatedSize = params.ErasureCodingParams.EstimatedShardSize
+ }
+ }
+ case *worker_pb.TaskParams_BalanceParams:
+ if params.BalanceParams != nil {
+ destNode = params.BalanceParams.DestNode
+ if params.BalanceParams.EstimatedSize > 0 {
+ estimatedSize = params.BalanceParams.EstimatedSize
+ }
+ }
+ case *worker_pb.TaskParams_ReplicationParams:
+ if params.ReplicationParams != nil {
+ destNode = params.ReplicationParams.DestNode
+ if params.ReplicationParams.EstimatedSize > 0 {
+ estimatedSize = params.ReplicationParams.EstimatedSize
+ }
+ }
+ }
+
+ operation := &PendingOperation{
+ VolumeID: task.VolumeID,
+ OperationType: opType,
+ SourceNode: task.Server,
+ DestNode: destNode,
+ TaskID: task.ID,
+ StartTime: time.Now(),
+ EstimatedSize: estimatedSize,
+ Collection: task.Collection,
+ Status: "assigned",
+ }
+
+ pendingOps.AddOperation(operation)
+}
+
+// removePendingOperation removes a task from the pending operations tracker
+func (mq *MaintenanceQueue) removePendingOperation(taskID string) {
+ if mq.integration == nil {
+ return
+ }
+
+ pendingOps := mq.integration.GetPendingOperations()
+ if pendingOps == nil {
+ return
+ }
+
+ pendingOps.RemoveOperation(taskID)
+}
+
+// updatePendingOperationStatus updates the status of a pending operation
+func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status string) {
+ if mq.integration == nil {
+ return
+ }
+
+ pendingOps := mq.integration.GetPendingOperations()
+ if pendingOps == nil {
+ return
+ }
+
+ pendingOps.UpdateOperationStatus(taskID, status)
+}