diff options
Diffstat (limited to 'weed/admin/dash/worker_grpc_server.go')
| -rw-r--r-- | weed/admin/dash/worker_grpc_server.go | 34 |
1 files changed, 25 insertions, 9 deletions
diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 36f97261a..3b4312235 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -319,27 +319,41 @@ func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *wo // handleTaskRequest processes task requests from workers func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) { + // glog.Infof("DEBUG handleTaskRequest: Worker %s requesting tasks with capabilities %v", conn.workerID, conn.capabilities) + if s.adminServer.maintenanceManager == nil { + glog.Infof("DEBUG handleTaskRequest: maintenance manager is nil") return } // Get next task from maintenance manager task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities) + // glog.Infof("DEBUG handleTaskRequest: GetNextTask returned task: %v", task != nil) if task != nil { + glog.Infof("DEBUG handleTaskRequest: Assigning task %s (type: %s) to worker %s", task.ID, task.Type, conn.workerID) + + // Use typed params directly - master client should already be configured in the params + var taskParams *worker_pb.TaskParams + if task.TypedParams != nil { + taskParams = task.TypedParams + } else { + // Create basic params if none exist + taskParams = &worker_pb.TaskParams{ + VolumeId: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + } + } + // Send task assignment assignment := &worker_pb.AdminMessage{ Timestamp: time.Now().Unix(), Message: &worker_pb.AdminMessage_TaskAssignment{ TaskAssignment: &worker_pb.TaskAssignment{ - TaskId: task.ID, - TaskType: string(task.Type), - Params: &worker_pb.TaskParams{ - VolumeId: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - Parameters: convertTaskParameters(task.Parameters), - }, + TaskId: task.ID, + TaskType: string(task.Type), + Params: taskParams, Priority: int32(task.Priority), CreatedTime: time.Now().Unix(), }, @@ -348,10 +362,12 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo select { case conn.outgoing <- assignment: - glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID) + glog.Infof("DEBUG handleTaskRequest: Successfully assigned task %s to worker %s", task.ID, conn.workerID) case <-time.After(time.Second): glog.Warningf("Failed to send task assignment to worker %s", conn.workerID) } + } else { + // glog.Infof("DEBUG handleTaskRequest: No tasks available for worker %s", conn.workerID) } } |
