diff options
Diffstat (limited to 'weed/worker/worker.go')
| -rw-r--r-- | weed/worker/worker.go | 523 |
1 files changed, 370 insertions, 153 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 0763fdc2e..afc203318 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -23,20 +23,56 @@ import ( // Worker represents a maintenance worker instance type Worker struct { - id string - config *types.WorkerConfig - registry *tasks.TaskRegistry - currentTasks map[string]*types.TaskInput - adminClient AdminClient + id string + config *types.WorkerConfig + registry *tasks.TaskRegistry + cmds chan workerCommand + state *workerState + taskLogHandler *tasks.TaskLogHandler + closeOnce sync.Once +} +type workerState struct { running bool - stopChan chan struct{} - mutex sync.RWMutex + adminClient AdminClient startTime time.Time - tasksCompleted int - tasksFailed int + stopChan chan struct{} heartbeatTicker *time.Ticker requestTicker *time.Ticker - taskLogHandler *tasks.TaskLogHandler + currentTasks map[string]*types.TaskInput + tasksCompleted int + tasksFailed int +} + +type workerAction string + +const ( + ActionStart workerAction = "start" + ActionStop workerAction = "stop" + ActionGetStatus workerAction = "getstatus" + ActionGetTaskLoad workerAction = "getload" + ActionSetTask workerAction = "settask" + ActionSetAdmin workerAction = "setadmin" + ActionRemoveTask workerAction = "removetask" + ActionGetAdmin workerAction = "getadmin" + ActionIncTaskFail workerAction = "inctaskfail" + ActionIncTaskComplete workerAction = "inctaskcomplete" + ActionGetHbTick workerAction = "gethbtick" + ActionGetReqTick workerAction = "getreqtick" + ActionGetStopChan workerAction = "getstopchan" + ActionSetHbTick workerAction = "sethbtick" + ActionSetReqTick workerAction = "setreqtick" + ActionGetStartTime workerAction = "getstarttime" + ActionGetCompletedTasks workerAction = "getcompletedtasks" + ActionGetFailedTasks workerAction = "getfailedtasks" + ActionCancelTask workerAction = "canceltask" + // ... other worker actions like Stop, Status, etc. +) + +type statusResponse chan types.WorkerStatus +type workerCommand struct { + action workerAction + data any + resp chan error // for reporting success/failure } // AdminClient defines the interface for communicating with the admin server @@ -150,17 +186,222 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { id: workerID, config: config, registry: registry, - currentTasks: make(map[string]*types.TaskInput), - stopChan: make(chan struct{}), - startTime: time.Now(), taskLogHandler: taskLogHandler, + cmds: make(chan workerCommand), } glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll())) - + go worker.managerLoop() return worker, nil } +func (w *Worker) managerLoop() { + w.state = &workerState{ + startTime: time.Now(), + stopChan: make(chan struct{}), + currentTasks: make(map[string]*types.TaskInput), + } + + for cmd := range w.cmds { + switch cmd.action { + case ActionStart: + w.handleStart(cmd) + case ActionStop: + w.handleStop(cmd) + case ActionGetStatus: + respCh := cmd.data.(statusResponse) + var currentTasks []types.TaskInput + for _, task := range w.state.currentTasks { + currentTasks = append(currentTasks, *task) + } + + statusStr := "active" + if len(w.state.currentTasks) >= w.config.MaxConcurrent { + statusStr = "busy" + } + + status := types.WorkerStatus{ + WorkerID: w.id, + Status: statusStr, + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + CurrentLoad: len(w.state.currentTasks), + LastHeartbeat: time.Now(), + CurrentTasks: currentTasks, + Uptime: time.Since(w.state.startTime), + TasksCompleted: w.state.tasksCompleted, + TasksFailed: w.state.tasksFailed, + } + respCh <- status + case ActionGetTaskLoad: + respCh := cmd.data.(chan int) + respCh <- len(w.state.currentTasks) + case ActionSetTask: + currentLoad := len(w.state.currentTasks) + if currentLoad >= w.config.MaxConcurrent { + cmd.resp <- fmt.Errorf("worker is at capacity") + } + task := cmd.data.(*types.TaskInput) + w.state.currentTasks[task.ID] = task + cmd.resp <- nil + case ActionSetAdmin: + admin := cmd.data.(AdminClient) + w.state.adminClient = admin + case ActionRemoveTask: + taskID := cmd.data.(string) + delete(w.state.currentTasks, taskID) + case ActionGetAdmin: + respCh := cmd.data.(chan AdminClient) + respCh <- w.state.adminClient + case ActionIncTaskFail: + w.state.tasksFailed++ + case ActionIncTaskComplete: + w.state.tasksCompleted++ + case ActionGetHbTick: + respCh := cmd.data.(chan *time.Ticker) + respCh <- w.state.heartbeatTicker + case ActionGetReqTick: + respCh := cmd.data.(chan *time.Ticker) + respCh <- w.state.requestTicker + case ActionSetHbTick: + w.state.heartbeatTicker = cmd.data.(*time.Ticker) + case ActionSetReqTick: + w.state.requestTicker = cmd.data.(*time.Ticker) + case ActionGetStopChan: + cmd.data.(chan chan struct{}) <- w.state.stopChan + case ActionGetStartTime: + cmd.data.(chan time.Time) <- w.state.startTime + case ActionGetCompletedTasks: + cmd.data.(chan int) <- w.state.tasksCompleted + case ActionGetFailedTasks: + cmd.data.(chan int) <- w.state.tasksFailed + case ActionCancelTask: + taskID := cmd.data.(string) + if task, exists := w.state.currentTasks[taskID]; exists { + glog.Infof("Cancelling task %s", task.ID) + // TODO: Implement actual task cancellation logic + } else { + glog.Warningf("Cannot cancel task %s: task not found", taskID) + } + + } + } +} + +func (w *Worker) getTaskLoad() int { + respCh := make(chan int, 1) + w.cmds <- workerCommand{ + action: ActionGetTaskLoad, + data: respCh, + resp: nil, + } + return <-respCh +} + +func (w *Worker) setTask(task *types.TaskInput) error { + resp := make(chan error) + w.cmds <- workerCommand{ + action: ActionSetTask, + data: task, + resp: resp, + } + if err := <-resp; err != nil { + glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", + w.id, w.getTaskLoad(), w.config.MaxConcurrent, task.ID) + return err + } + newLoad := w.getTaskLoad() + + glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", + w.id, task.ID, newLoad, w.config.MaxConcurrent) + return nil +} + +func (w *Worker) removeTask(task *types.TaskInput) int { + w.cmds <- workerCommand{ + action: ActionRemoveTask, + data: task.ID, + } + return w.getTaskLoad() +} + +func (w *Worker) getAdmin() AdminClient { + respCh := make(chan AdminClient, 1) + w.cmds <- workerCommand{ + action: ActionGetAdmin, + data: respCh, + } + return <-respCh +} + +func (w *Worker) getStopChan() chan struct{} { + respCh := make(chan chan struct{}, 1) + w.cmds <- workerCommand{ + action: ActionGetStopChan, + data: respCh, + } + return <-respCh +} + +func (w *Worker) getHbTick() *time.Ticker { + respCh := make(chan *time.Ticker, 1) + w.cmds <- workerCommand{ + action: ActionGetHbTick, + data: respCh, + } + return <-respCh +} + +func (w *Worker) getReqTick() *time.Ticker { + respCh := make(chan *time.Ticker, 1) + w.cmds <- workerCommand{ + action: ActionGetReqTick, + data: respCh, + } + return <-respCh +} + +func (w *Worker) setHbTick(tick *time.Ticker) *time.Ticker { + w.cmds <- workerCommand{ + action: ActionSetHbTick, + data: tick, + } + return w.getHbTick() +} + +func (w *Worker) setReqTick(tick *time.Ticker) *time.Ticker { + w.cmds <- workerCommand{ + action: ActionSetReqTick, + data: tick, + } + return w.getReqTick() +} + +func (w *Worker) getStartTime() time.Time { + respCh := make(chan time.Time, 1) + w.cmds <- workerCommand{ + action: ActionGetStartTime, + data: respCh, + } + return <-respCh +} +func (w *Worker) getCompletedTasks() int { + respCh := make(chan int, 1) + w.cmds <- workerCommand{ + action: ActionGetCompletedTasks, + data: respCh, + } + return <-respCh +} +func (w *Worker) getFailedTasks() int { + respCh := make(chan int, 1) + w.cmds <- workerCommand{ + action: ActionGetFailedTasks, + data: respCh, + } + return <-respCh +} + // getTaskLoggerConfig returns the task logger configuration with worker's log directory func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig { config := tasks.DefaultTaskLoggerConfig() @@ -177,21 +418,29 @@ func (w *Worker) ID() string { return w.id } -// Start starts the worker func (w *Worker) Start() error { - w.mutex.Lock() - defer w.mutex.Unlock() + resp := make(chan error) + w.cmds <- workerCommand{ + action: ActionStart, + resp: resp, + } + return <-resp +} - if w.running { - return fmt.Errorf("worker is already running") +// Start starts the worker +func (w *Worker) handleStart(cmd workerCommand) { + if w.state.running { + cmd.resp <- fmt.Errorf("worker is already running") + return } - if w.adminClient == nil { - return fmt.Errorf("admin client is not set") + if w.state.adminClient == nil { + cmd.resp <- fmt.Errorf("admin client is not set") + return } - w.running = true - w.startTime = time.Now() + w.state.running = true + w.state.startTime = time.Now() // Prepare worker info for registration workerInfo := &types.WorkerData{ @@ -204,7 +453,7 @@ func (w *Worker) Start() error { } // Register worker info with client first (this stores it for use during connection) - if err := w.adminClient.RegisterWorker(workerInfo); err != nil { + if err := w.state.adminClient.RegisterWorker(workerInfo); err != nil { glog.V(1).Infof("Worker info stored for registration: %v", err) // This is expected if not connected yet } @@ -214,7 +463,7 @@ func (w *Worker) Start() error { w.id, w.config.Capabilities, w.config.MaxConcurrent) // Try initial connection, but don't fail if it doesn't work immediately - if err := w.adminClient.Connect(); err != nil { + if err := w.state.adminClient.Connect(); err != nil { glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err) // Don't return error - let the reconnection loop handle it } else { @@ -230,54 +479,67 @@ func (w *Worker) Start() error { go w.messageProcessingLoop() glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id) - return nil + cmd.resp <- nil } -// Stop stops the worker func (w *Worker) Stop() error { - w.mutex.Lock() - defer w.mutex.Unlock() - - if !w.running { - return nil - } - - w.running = false - close(w.stopChan) - - // Stop tickers - if w.heartbeatTicker != nil { - w.heartbeatTicker.Stop() + resp := make(chan error) + w.cmds <- workerCommand{ + action: ActionStop, + resp: resp, } - if w.requestTicker != nil { - w.requestTicker.Stop() + if err := <-resp; err != nil { + return err } - // Wait for current tasks to complete or timeout + // Wait for tasks to finish timeout := time.NewTimer(30 * time.Second) defer timeout.Stop() - - for len(w.currentTasks) > 0 { + for w.getTaskLoad() > 0 { select { case <-timeout.C: - glog.Warningf("Worker %s stopping with %d tasks still running", w.id, len(w.currentTasks)) - break - case <-time.After(time.Second): - // Check again + glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad()) + goto end_wait + case <-time.After(100 * time.Millisecond): } } +end_wait: // Disconnect from admin server - if w.adminClient != nil { - if err := w.adminClient.Disconnect(); err != nil { + if adminClient := w.getAdmin(); adminClient != nil { + if err := adminClient.Disconnect(); err != nil { glog.Errorf("Error disconnecting from admin server: %v", err) } } + w.closeOnce.Do(func() { + close(w.cmds) + }) glog.Infof("Worker %s stopped", w.id) return nil } +// Stop stops the worker +func (w *Worker) handleStop(cmd workerCommand) { + if !w.state.running { + cmd.resp <- nil + return + } + + w.state.running = false + close(w.state.stopChan) + + // Stop tickers + if w.state.heartbeatTicker != nil { + w.state.heartbeatTicker.Stop() + } + if w.state.requestTicker != nil { + w.state.requestTicker.Stop() + } + + cmd.resp <- nil +} + // RegisterTask registers a task factory func (w *Worker) RegisterTask(taskType types.TaskType, factory types.TaskFactory) { w.registry.Register(taskType, factory) @@ -290,31 +552,13 @@ func (w *Worker) GetCapabilities() []types.TaskType { // GetStatus returns the current worker status func (w *Worker) GetStatus() types.WorkerStatus { - w.mutex.RLock() - defer w.mutex.RUnlock() - - var currentTasks []types.TaskInput - for _, task := range w.currentTasks { - currentTasks = append(currentTasks, *task) - } - - status := "active" - if len(w.currentTasks) >= w.config.MaxConcurrent { - status = "busy" - } - - return types.WorkerStatus{ - WorkerID: w.id, - Status: status, - Capabilities: w.config.Capabilities, - MaxConcurrent: w.config.MaxConcurrent, - CurrentLoad: len(w.currentTasks), - LastHeartbeat: time.Now(), - CurrentTasks: currentTasks, - Uptime: time.Since(w.startTime), - TasksCompleted: w.tasksCompleted, - TasksFailed: w.tasksFailed, + respCh := make(statusResponse, 1) + w.cmds <- workerCommand{ + action: ActionGetStatus, + data: respCh, + resp: nil, } + return <-respCh } // HandleTask handles a task execution @@ -322,22 +566,10 @@ func (w *Worker) HandleTask(task *types.TaskInput) error { glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)", w.id, task.ID, task.Type, task.VolumeID) - w.mutex.Lock() - currentLoad := len(w.currentTasks) - if currentLoad >= w.config.MaxConcurrent { - w.mutex.Unlock() - glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", - w.id, currentLoad, w.config.MaxConcurrent, task.ID) - return fmt.Errorf("worker is at capacity") + if err := w.setTask(task); err != nil { + return err } - w.currentTasks[task.ID] = task - newLoad := len(w.currentTasks) - w.mutex.Unlock() - - glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", - w.id, task.ID, newLoad, w.config.MaxConcurrent) - // Execute task in goroutine go w.executeTask(task) @@ -366,7 +598,10 @@ func (w *Worker) SetTaskRequestInterval(interval time.Duration) { // SetAdminClient sets the admin client func (w *Worker) SetAdminClient(client AdminClient) { - w.adminClient = client + w.cmds <- workerCommand{ + action: ActionSetAdmin, + data: client, + } } // executeTask executes a task @@ -374,10 +609,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { startTime := time.Now() defer func() { - w.mutex.Lock() - delete(w.currentTasks, task.ID) - currentLoad := len(w.currentTasks) - w.mutex.Unlock() + currentLoad := w.removeTask(task) duration := time.Since(startTime) glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d", @@ -388,7 +620,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339)) // Report task start to admin server - if err := w.adminClient.UpdateTaskProgress(task.ID, 0.0); err != nil { + if err := w.getAdmin().UpdateTaskProgress(task.ID, 0.0); err != nil { glog.V(1).Infof("Failed to report task start to admin: %v", err) } @@ -461,7 +693,7 @@ func (w *Worker) executeTask(task *types.TaskInput) { taskInstance.SetProgressCallback(func(progress float64, stage string) { // Report progress updates to admin server glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage) - if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil { + if err := w.getAdmin().UpdateTaskProgress(task.ID, progress); err != nil { glog.V(1).Infof("Failed to report task progress to admin: %v", err) } if fileLogger != nil { @@ -481,7 +713,9 @@ func (w *Worker) executeTask(task *types.TaskInput) { // Report completion if err != nil { w.completeTask(task.ID, false, err.Error()) - w.tasksFailed++ + w.cmds <- workerCommand{ + action: ActionIncTaskFail, + } glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) if fileLogger != nil { fileLogger.LogStatus("failed", err.Error()) @@ -489,18 +723,21 @@ func (w *Worker) executeTask(task *types.TaskInput) { } } else { w.completeTask(task.ID, true, "") - w.tasksCompleted++ + w.cmds <- workerCommand{ + action: ActionIncTaskComplete, + } glog.Infof("Worker %s completed task %s successfully", w.id, task.ID) if fileLogger != nil { fileLogger.Info("Task %s completed successfully", task.ID) } } + return } // completeTask reports task completion to admin server func (w *Worker) completeTask(taskID string, success bool, errorMsg string) { - if w.adminClient != nil { - if err := w.adminClient.CompleteTask(taskID, success, errorMsg); err != nil { + if w.getAdmin() != nil { + if err := w.getAdmin().CompleteTask(taskID, success, errorMsg); err != nil { glog.Errorf("Failed to report task completion: %v", err) } } @@ -508,14 +745,14 @@ func (w *Worker) completeTask(taskID string, success bool, errorMsg string) { // heartbeatLoop sends periodic heartbeats to the admin server func (w *Worker) heartbeatLoop() { - w.heartbeatTicker = time.NewTicker(w.config.HeartbeatInterval) - defer w.heartbeatTicker.Stop() - + defer w.setHbTick(time.NewTicker(w.config.HeartbeatInterval)).Stop() + ticker := w.getHbTick() + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: return - case <-w.heartbeatTicker.C: + case <-ticker.C: w.sendHeartbeat() } } @@ -523,14 +760,14 @@ func (w *Worker) heartbeatLoop() { // taskRequestLoop periodically requests new tasks from the admin server func (w *Worker) taskRequestLoop() { - w.requestTicker = time.NewTicker(w.config.TaskRequestInterval) - defer w.requestTicker.Stop() - + defer w.setReqTick(time.NewTicker(w.config.TaskRequestInterval)).Stop() + ticker := w.getReqTick() + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: return - case <-w.requestTicker.C: + case <-ticker.C: w.requestTasks() } } @@ -538,13 +775,13 @@ func (w *Worker) taskRequestLoop() { // sendHeartbeat sends heartbeat to admin server func (w *Worker) sendHeartbeat() { - if w.adminClient != nil { - if err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{ + if w.getAdmin() != nil { + if err := w.getAdmin().SendHeartbeat(w.id, &types.WorkerStatus{ WorkerID: w.id, Status: "active", Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, - CurrentLoad: len(w.currentTasks), + CurrentLoad: w.getTaskLoad(), LastHeartbeat: time.Now(), }); err != nil { glog.Warningf("Failed to send heartbeat: %v", err) @@ -554,9 +791,7 @@ func (w *Worker) sendHeartbeat() { // requestTasks requests new tasks from the admin server func (w *Worker) requestTasks() { - w.mutex.RLock() - currentLoad := len(w.currentTasks) - w.mutex.RUnlock() + currentLoad := w.getTaskLoad() if currentLoad >= w.config.MaxConcurrent { glog.V(3).Infof("TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)", @@ -564,11 +799,11 @@ func (w *Worker) requestTasks() { return // Already at capacity } - if w.adminClient != nil { + if w.getAdmin() != nil { glog.V(3).Infof("REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)", w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities) - task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities) + task, err := w.getAdmin().RequestTask(w.id, w.config.Capabilities) if err != nil { glog.V(2).Infof("TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err) return @@ -591,18 +826,6 @@ func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry { return w.registry } -// GetCurrentTasks returns the current tasks -func (w *Worker) GetCurrentTasks() map[string]*types.TaskInput { - w.mutex.RLock() - defer w.mutex.RUnlock() - - tasks := make(map[string]*types.TaskInput) - for id, task := range w.currentTasks { - tasks[id] = task - } - return tasks -} - // registerWorker registers the worker with the admin server func (w *Worker) registerWorker() { workerInfo := &types.WorkerData{ @@ -614,7 +837,7 @@ func (w *Worker) registerWorker() { LastHeartbeat: time.Now(), } - if err := w.adminClient.RegisterWorker(workerInfo); err != nil { + if err := w.getAdmin().RegisterWorker(workerInfo); err != nil { glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err) } else { glog.Infof("Worker %s registered successfully with admin server", w.id) @@ -627,15 +850,15 @@ func (w *Worker) connectionMonitorLoop() { defer ticker.Stop() lastConnectionStatus := false - + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id) return case <-ticker.C: // Monitor connection status and log changes - currentConnectionStatus := w.adminClient != nil && w.adminClient.IsConnected() + currentConnectionStatus := w.getAdmin() != nil && w.getAdmin().IsConnected() if currentConnectionStatus != lastConnectionStatus { if currentConnectionStatus { @@ -662,19 +885,17 @@ func (w *Worker) GetConfig() *types.WorkerConfig { // GetPerformanceMetrics returns performance metrics func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { - w.mutex.RLock() - defer w.mutex.RUnlock() - uptime := time.Since(w.startTime) + uptime := time.Since(w.getStartTime()) var successRate float64 - totalTasks := w.tasksCompleted + w.tasksFailed + totalTasks := w.getCompletedTasks() + w.getFailedTasks() if totalTasks > 0 { - successRate = float64(w.tasksCompleted) / float64(totalTasks) * 100 + successRate = float64(w.getCompletedTasks()) / float64(totalTasks) * 100 } return &types.WorkerPerformance{ - TasksCompleted: w.tasksCompleted, - TasksFailed: w.tasksFailed, + TasksCompleted: w.getCompletedTasks(), + TasksFailed: w.getFailedTasks(), AverageTaskTime: 0, // Would need to track this Uptime: uptime, SuccessRate: successRate, @@ -686,7 +907,7 @@ func (w *Worker) messageProcessingLoop() { glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id) // Get access to the incoming message channel from gRPC client - grpcClient, ok := w.adminClient.(*GrpcAdminClient) + grpcClient, ok := w.getAdmin().(*GrpcAdminClient) if !ok { glog.Warningf("MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id) return @@ -694,10 +915,10 @@ func (w *Worker) messageProcessingLoop() { incomingChan := grpcClient.GetIncomingChannel() glog.V(1).Infof("MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id) - + stopChan := w.getStopChan() for { select { - case <-w.stopChan: + case <-stopChan: glog.Infof("MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id) return case message := <-incomingChan: @@ -773,7 +994,7 @@ func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) { }, } - grpcClient, ok := w.adminClient.(*GrpcAdminClient) + grpcClient, ok := w.getAdmin().(*GrpcAdminClient) if !ok { glog.Errorf("Cannot send task log response: admin client is not gRPC client") return @@ -791,14 +1012,10 @@ func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) { func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) { glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId) - w.mutex.Lock() - defer w.mutex.Unlock() - - if task, exists := w.currentTasks[cancellation.TaskId]; exists { - // TODO: Implement task cancellation logic - glog.Infof("Cancelling task %s", task.ID) - } else { - glog.Warningf("Cannot cancel task %s: task not found", cancellation.TaskId) + w.cmds <- workerCommand{ + action: ActionCancelTask, + data: cancellation.TaskId, + resp: nil, } } |
