aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/worker_grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/worker_grpc_server.go')
-rw-r--r--weed/admin/dash/worker_grpc_server.go164
1 files changed, 160 insertions, 4 deletions
diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go
index 3b4312235..78ba6d7de 100644
--- a/weed/admin/dash/worker_grpc_server.go
+++ b/weed/admin/dash/worker_grpc_server.go
@@ -26,6 +26,10 @@ type WorkerGrpcServer struct {
connections map[string]*WorkerConnection
connMutex sync.RWMutex
+ // Log request correlation
+ pendingLogRequests map[string]*LogRequestContext
+ logRequestsMutex sync.RWMutex
+
// gRPC server
grpcServer *grpc.Server
listener net.Listener
@@ -33,6 +37,14 @@ type WorkerGrpcServer struct {
stopChan chan struct{}
}
+// LogRequestContext tracks pending log requests
+type LogRequestContext struct {
+ TaskID string
+ WorkerID string
+ ResponseCh chan *worker_pb.TaskLogResponse
+ Timeout time.Time
+}
+
// WorkerConnection represents an active worker connection
type WorkerConnection struct {
workerID string
@@ -49,9 +61,10 @@ type WorkerConnection struct {
// NewWorkerGrpcServer creates a new gRPC server for worker connections
func NewWorkerGrpcServer(adminServer *AdminServer) *WorkerGrpcServer {
return &WorkerGrpcServer{
- adminServer: adminServer,
- connections: make(map[string]*WorkerConnection),
- stopChan: make(chan struct{}),
+ adminServer: adminServer,
+ connections: make(map[string]*WorkerConnection),
+ pendingLogRequests: make(map[string]*LogRequestContext),
+ stopChan: make(chan struct{}),
}
}
@@ -264,6 +277,9 @@ func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *work
case *worker_pb.WorkerMessage_TaskComplete:
s.handleTaskCompletion(conn, m.TaskComplete)
+ case *worker_pb.WorkerMessage_TaskLogResponse:
+ s.handleTaskLogResponse(conn, m.TaskLogResponse)
+
case *worker_pb.WorkerMessage_Shutdown:
glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason)
s.unregisterWorker(workerID)
@@ -341,8 +357,13 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo
// Create basic params if none exist
taskParams = &worker_pb.TaskParams{
VolumeId: task.VolumeID,
- Server: task.Server,
Collection: task.Collection,
+ Sources: []*worker_pb.TaskSource{
+ {
+ Node: task.Server,
+ VolumeId: task.VolumeID,
+ },
+ },
}
}
@@ -396,6 +417,35 @@ func (s *WorkerGrpcServer) handleTaskCompletion(conn *WorkerConnection, completi
}
}
+// handleTaskLogResponse processes task log responses from workers
+func (s *WorkerGrpcServer) handleTaskLogResponse(conn *WorkerConnection, response *worker_pb.TaskLogResponse) {
+ requestKey := fmt.Sprintf("%s:%s", response.WorkerId, response.TaskId)
+
+ s.logRequestsMutex.RLock()
+ requestContext, exists := s.pendingLogRequests[requestKey]
+ s.logRequestsMutex.RUnlock()
+
+ if !exists {
+ glog.Warningf("Received unexpected log response for task %s from worker %s", response.TaskId, response.WorkerId)
+ return
+ }
+
+ glog.V(1).Infof("Received log response for task %s from worker %s: %d entries", response.TaskId, response.WorkerId, len(response.LogEntries))
+
+ // Send response to waiting channel
+ select {
+ case requestContext.ResponseCh <- response:
+ // Response delivered successfully
+ case <-time.After(time.Second):
+ glog.Warningf("Failed to deliver log response for task %s from worker %s: timeout", response.TaskId, response.WorkerId)
+ }
+
+ // Clean up the pending request
+ s.logRequestsMutex.Lock()
+ delete(s.pendingLogRequests, requestKey)
+ s.logRequestsMutex.Unlock()
+}
+
// unregisterWorker removes a worker connection
func (s *WorkerGrpcServer) unregisterWorker(workerID string) {
s.connMutex.Lock()
@@ -453,6 +503,112 @@ func (s *WorkerGrpcServer) GetConnectedWorkers() []string {
return workers
}
+// RequestTaskLogs requests execution logs from a worker for a specific task
+func (s *WorkerGrpcServer) RequestTaskLogs(workerID, taskID string, maxEntries int32, logLevel string) ([]*worker_pb.TaskLogEntry, error) {
+ s.connMutex.RLock()
+ conn, exists := s.connections[workerID]
+ s.connMutex.RUnlock()
+
+ if !exists {
+ return nil, fmt.Errorf("worker %s is not connected", workerID)
+ }
+
+ // Create response channel for this request
+ responseCh := make(chan *worker_pb.TaskLogResponse, 1)
+ requestKey := fmt.Sprintf("%s:%s", workerID, taskID)
+
+ // Register pending request
+ requestContext := &LogRequestContext{
+ TaskID: taskID,
+ WorkerID: workerID,
+ ResponseCh: responseCh,
+ Timeout: time.Now().Add(10 * time.Second),
+ }
+
+ s.logRequestsMutex.Lock()
+ s.pendingLogRequests[requestKey] = requestContext
+ s.logRequestsMutex.Unlock()
+
+ // Create log request message
+ logRequest := &worker_pb.AdminMessage{
+ AdminId: "admin-server",
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.AdminMessage_TaskLogRequest{
+ TaskLogRequest: &worker_pb.TaskLogRequest{
+ TaskId: taskID,
+ WorkerId: workerID,
+ IncludeMetadata: true,
+ MaxEntries: maxEntries,
+ LogLevel: logLevel,
+ },
+ },
+ }
+
+ // Send the request through the worker's outgoing channel
+ select {
+ case conn.outgoing <- logRequest:
+ glog.V(1).Infof("Log request sent to worker %s for task %s", workerID, taskID)
+ case <-time.After(5 * time.Second):
+ // Clean up pending request on timeout
+ s.logRequestsMutex.Lock()
+ delete(s.pendingLogRequests, requestKey)
+ s.logRequestsMutex.Unlock()
+ return nil, fmt.Errorf("timeout sending log request to worker %s", workerID)
+ }
+
+ // Wait for response
+ select {
+ case response := <-responseCh:
+ if !response.Success {
+ return nil, fmt.Errorf("worker log request failed: %s", response.ErrorMessage)
+ }
+ glog.V(1).Infof("Received %d log entries for task %s from worker %s", len(response.LogEntries), taskID, workerID)
+ return response.LogEntries, nil
+ case <-time.After(10 * time.Second):
+ // Clean up pending request on timeout
+ s.logRequestsMutex.Lock()
+ delete(s.pendingLogRequests, requestKey)
+ s.logRequestsMutex.Unlock()
+ return nil, fmt.Errorf("timeout waiting for log response from worker %s", workerID)
+ }
+}
+
+// RequestTaskLogsFromAllWorkers requests logs for a task from all connected workers
+func (s *WorkerGrpcServer) RequestTaskLogsFromAllWorkers(taskID string, maxEntries int32, logLevel string) (map[string][]*worker_pb.TaskLogEntry, error) {
+ s.connMutex.RLock()
+ workerIDs := make([]string, 0, len(s.connections))
+ for workerID := range s.connections {
+ workerIDs = append(workerIDs, workerID)
+ }
+ s.connMutex.RUnlock()
+
+ results := make(map[string][]*worker_pb.TaskLogEntry)
+
+ for _, workerID := range workerIDs {
+ logs, err := s.RequestTaskLogs(workerID, taskID, maxEntries, logLevel)
+ if err != nil {
+ glog.V(1).Infof("Failed to get logs from worker %s for task %s: %v", workerID, taskID, err)
+ // Store empty result with error information for debugging
+ results[workerID+"_error"] = []*worker_pb.TaskLogEntry{
+ {
+ Timestamp: time.Now().Unix(),
+ Level: "ERROR",
+ Message: fmt.Sprintf("Failed to retrieve logs from worker %s: %v", workerID, err),
+ Fields: map[string]string{"source": "admin"},
+ },
+ }
+ continue
+ }
+ if len(logs) > 0 {
+ results[workerID] = logs
+ } else {
+ glog.V(2).Infof("No logs found for task %s on worker %s", taskID, workerID)
+ }
+ }
+
+ return results, nil
+}
+
// convertTaskParameters converts task parameters to protobuf format
func convertTaskParameters(params map[string]interface{}) map[string]string {
result := make(map[string]string)