diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-09 21:47:29 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-09 21:47:29 -0700 |
| commit | 25bbf4c3d44b1c8a9aa4980e37ed399ec249f771 (patch) | |
| tree | aabb2ce3c6f55e4cf1e26ce2b6989086c17830bc /weed/admin/dash/worker_grpc_server.go | |
| parent | 3ac2a2e22d863753a6b568596fbe9d76d03023b5 (diff) | |
| download | seaweedfs-25bbf4c3d44b1c8a9aa4980e37ed399ec249f771.tar.xz seaweedfs-25bbf4c3d44b1c8a9aa4980e37ed399ec249f771.zip | |
Admin UI: Fetch task logs (#7114)
* show task details
* loading tasks
* task UI works
* generic rendering
* rendering the export link
* removing placementConflicts from task parameters
* remove TaskSourceLocation
* remove "Server ID" column
* rendering balance task source
* sources and targets
* fix ec task generation
* move info
* render timeline
* simplified worker id
* simplify
* read task logs from worker
* isValidTaskID
* address comments
* Update weed/worker/tasks/balance/execution.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/worker/tasks/erasure_coding/ec_task.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/worker/tasks/task_log_handler.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix shard ids
* plan distributing shard id
* rendering planned shards in task details
* remove Conflicts
* worker logs correctly
* pass in dc and rack
* task logging
* Update weed/admin/maintenance/maintenance_queue.go
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* display log details
* logs have fields now
* sort field keys
* fix link
* fix collection filtering
* avoid hard coded ec shard counts
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/admin/dash/worker_grpc_server.go')
| -rw-r--r-- | weed/admin/dash/worker_grpc_server.go | 164 |
1 files changed, 160 insertions, 4 deletions
diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 3b4312235..78ba6d7de 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -26,6 +26,10 @@ type WorkerGrpcServer struct { connections map[string]*WorkerConnection connMutex sync.RWMutex + // Log request correlation + pendingLogRequests map[string]*LogRequestContext + logRequestsMutex sync.RWMutex + // gRPC server grpcServer *grpc.Server listener net.Listener @@ -33,6 +37,14 @@ type WorkerGrpcServer struct { stopChan chan struct{} } +// LogRequestContext tracks pending log requests +type LogRequestContext struct { + TaskID string + WorkerID string + ResponseCh chan *worker_pb.TaskLogResponse + Timeout time.Time +} + // WorkerConnection represents an active worker connection type WorkerConnection struct { workerID string @@ -49,9 +61,10 @@ type WorkerConnection struct { // NewWorkerGrpcServer creates a new gRPC server for worker connections func NewWorkerGrpcServer(adminServer *AdminServer) *WorkerGrpcServer { return &WorkerGrpcServer{ - adminServer: adminServer, - connections: make(map[string]*WorkerConnection), - stopChan: make(chan struct{}), + adminServer: adminServer, + connections: make(map[string]*WorkerConnection), + pendingLogRequests: make(map[string]*LogRequestContext), + stopChan: make(chan struct{}), } } @@ -264,6 +277,9 @@ func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *work case *worker_pb.WorkerMessage_TaskComplete: s.handleTaskCompletion(conn, m.TaskComplete) + case *worker_pb.WorkerMessage_TaskLogResponse: + s.handleTaskLogResponse(conn, m.TaskLogResponse) + case *worker_pb.WorkerMessage_Shutdown: glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason) s.unregisterWorker(workerID) @@ -341,8 +357,13 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo // Create basic params if none exist taskParams = &worker_pb.TaskParams{ VolumeId: task.VolumeID, - Server: task.Server, Collection: task.Collection, + Sources: []*worker_pb.TaskSource{ + { + Node: task.Server, + VolumeId: task.VolumeID, + }, + }, } } @@ -396,6 +417,35 @@ func (s *WorkerGrpcServer) handleTaskCompletion(conn *WorkerConnection, completi } } +// handleTaskLogResponse processes task log responses from workers +func (s *WorkerGrpcServer) handleTaskLogResponse(conn *WorkerConnection, response *worker_pb.TaskLogResponse) { + requestKey := fmt.Sprintf("%s:%s", response.WorkerId, response.TaskId) + + s.logRequestsMutex.RLock() + requestContext, exists := s.pendingLogRequests[requestKey] + s.logRequestsMutex.RUnlock() + + if !exists { + glog.Warningf("Received unexpected log response for task %s from worker %s", response.TaskId, response.WorkerId) + return + } + + glog.V(1).Infof("Received log response for task %s from worker %s: %d entries", response.TaskId, response.WorkerId, len(response.LogEntries)) + + // Send response to waiting channel + select { + case requestContext.ResponseCh <- response: + // Response delivered successfully + case <-time.After(time.Second): + glog.Warningf("Failed to deliver log response for task %s from worker %s: timeout", response.TaskId, response.WorkerId) + } + + // Clean up the pending request + s.logRequestsMutex.Lock() + delete(s.pendingLogRequests, requestKey) + s.logRequestsMutex.Unlock() +} + // unregisterWorker removes a worker connection func (s *WorkerGrpcServer) unregisterWorker(workerID string) { s.connMutex.Lock() @@ -453,6 +503,112 @@ func (s *WorkerGrpcServer) GetConnectedWorkers() []string { return workers } +// RequestTaskLogs requests execution logs from a worker for a specific task +func (s *WorkerGrpcServer) RequestTaskLogs(workerID, taskID string, maxEntries int32, logLevel string) ([]*worker_pb.TaskLogEntry, error) { + s.connMutex.RLock() + conn, exists := s.connections[workerID] + s.connMutex.RUnlock() + + if !exists { + return nil, fmt.Errorf("worker %s is not connected", workerID) + } + + // Create response channel for this request + responseCh := make(chan *worker_pb.TaskLogResponse, 1) + requestKey := fmt.Sprintf("%s:%s", workerID, taskID) + + // Register pending request + requestContext := &LogRequestContext{ + TaskID: taskID, + WorkerID: workerID, + ResponseCh: responseCh, + Timeout: time.Now().Add(10 * time.Second), + } + + s.logRequestsMutex.Lock() + s.pendingLogRequests[requestKey] = requestContext + s.logRequestsMutex.Unlock() + + // Create log request message + logRequest := &worker_pb.AdminMessage{ + AdminId: "admin-server", + Timestamp: time.Now().Unix(), + Message: &worker_pb.AdminMessage_TaskLogRequest{ + TaskLogRequest: &worker_pb.TaskLogRequest{ + TaskId: taskID, + WorkerId: workerID, + IncludeMetadata: true, + MaxEntries: maxEntries, + LogLevel: logLevel, + }, + }, + } + + // Send the request through the worker's outgoing channel + select { + case conn.outgoing <- logRequest: + glog.V(1).Infof("Log request sent to worker %s for task %s", workerID, taskID) + case <-time.After(5 * time.Second): + // Clean up pending request on timeout + s.logRequestsMutex.Lock() + delete(s.pendingLogRequests, requestKey) + s.logRequestsMutex.Unlock() + return nil, fmt.Errorf("timeout sending log request to worker %s", workerID) + } + + // Wait for response + select { + case response := <-responseCh: + if !response.Success { + return nil, fmt.Errorf("worker log request failed: %s", response.ErrorMessage) + } + glog.V(1).Infof("Received %d log entries for task %s from worker %s", len(response.LogEntries), taskID, workerID) + return response.LogEntries, nil + case <-time.After(10 * time.Second): + // Clean up pending request on timeout + s.logRequestsMutex.Lock() + delete(s.pendingLogRequests, requestKey) + s.logRequestsMutex.Unlock() + return nil, fmt.Errorf("timeout waiting for log response from worker %s", workerID) + } +} + +// RequestTaskLogsFromAllWorkers requests logs for a task from all connected workers +func (s *WorkerGrpcServer) RequestTaskLogsFromAllWorkers(taskID string, maxEntries int32, logLevel string) (map[string][]*worker_pb.TaskLogEntry, error) { + s.connMutex.RLock() + workerIDs := make([]string, 0, len(s.connections)) + for workerID := range s.connections { + workerIDs = append(workerIDs, workerID) + } + s.connMutex.RUnlock() + + results := make(map[string][]*worker_pb.TaskLogEntry) + + for _, workerID := range workerIDs { + logs, err := s.RequestTaskLogs(workerID, taskID, maxEntries, logLevel) + if err != nil { + glog.V(1).Infof("Failed to get logs from worker %s for task %s: %v", workerID, taskID, err) + // Store empty result with error information for debugging + results[workerID+"_error"] = []*worker_pb.TaskLogEntry{ + { + Timestamp: time.Now().Unix(), + Level: "ERROR", + Message: fmt.Sprintf("Failed to retrieve logs from worker %s: %v", workerID, err), + Fields: map[string]string{"source": "admin"}, + }, + } + continue + } + if len(logs) > 0 { + results[workerID] = logs + } else { + glog.V(2).Infof("No logs found for task %s on worker %s", taskID, workerID) + } + } + + return results, nil +} + // convertTaskParameters converts task parameters to protobuf format func convertTaskParameters(params map[string]interface{}) map[string]string { result := make(map[string]string) |
