aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/worker_grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/worker_grpc_server.go')
-rw-r--r--weed/admin/dash/worker_grpc_server.go34
1 files changed, 25 insertions, 9 deletions
diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go
index 36f97261a..3b4312235 100644
--- a/weed/admin/dash/worker_grpc_server.go
+++ b/weed/admin/dash/worker_grpc_server.go
@@ -319,27 +319,41 @@ func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *wo
// handleTaskRequest processes task requests from workers
func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) {
+ // glog.Infof("DEBUG handleTaskRequest: Worker %s requesting tasks with capabilities %v", conn.workerID, conn.capabilities)
+
if s.adminServer.maintenanceManager == nil {
+ glog.Infof("DEBUG handleTaskRequest: maintenance manager is nil")
return
}
// Get next task from maintenance manager
task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities)
+ // glog.Infof("DEBUG handleTaskRequest: GetNextTask returned task: %v", task != nil)
if task != nil {
+ glog.Infof("DEBUG handleTaskRequest: Assigning task %s (type: %s) to worker %s", task.ID, task.Type, conn.workerID)
+
+ // Use typed params directly - master client should already be configured in the params
+ var taskParams *worker_pb.TaskParams
+ if task.TypedParams != nil {
+ taskParams = task.TypedParams
+ } else {
+ // Create basic params if none exist
+ taskParams = &worker_pb.TaskParams{
+ VolumeId: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ }
+ }
+
// Send task assignment
assignment := &worker_pb.AdminMessage{
Timestamp: time.Now().Unix(),
Message: &worker_pb.AdminMessage_TaskAssignment{
TaskAssignment: &worker_pb.TaskAssignment{
- TaskId: task.ID,
- TaskType: string(task.Type),
- Params: &worker_pb.TaskParams{
- VolumeId: task.VolumeID,
- Server: task.Server,
- Collection: task.Collection,
- Parameters: convertTaskParameters(task.Parameters),
- },
+ TaskId: task.ID,
+ TaskType: string(task.Type),
+ Params: taskParams,
Priority: int32(task.Priority),
CreatedTime: time.Now().Unix(),
},
@@ -348,10 +362,12 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo
select {
case conn.outgoing <- assignment:
- glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID)
+ glog.Infof("DEBUG handleTaskRequest: Successfully assigned task %s to worker %s", task.ID, conn.workerID)
case <-time.After(time.Second):
glog.Warningf("Failed to send task assignment to worker %s", conn.workerID)
}
+ } else {
+ // glog.Infof("DEBUG handleTaskRequest: No tasks available for worker %s", conn.workerID)
}
}