diff options
Diffstat (limited to 'weed/admin/dash/admin_server.go')
| -rw-r--r-- | weed/admin/dash/admin_server.go | 265 |
1 files changed, 255 insertions, 10 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 376f3edc7..3f135ee1b 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/http" + "strconv" "time" "github.com/gin-gonic/gin" @@ -878,6 +879,46 @@ func (as *AdminServer) GetMaintenanceTask(c *gin.Context) { c.JSON(http.StatusOK, task) } +// GetMaintenanceTaskDetailAPI returns detailed task information via API +func (as *AdminServer) GetMaintenanceTaskDetailAPI(c *gin.Context) { + taskID := c.Param("id") + taskDetail, err := as.GetMaintenanceTaskDetail(taskID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Task detail not found", "details": err.Error()}) + return + } + + c.JSON(http.StatusOK, taskDetail) +} + +// ShowMaintenanceTaskDetail renders the task detail page +func (as *AdminServer) ShowMaintenanceTaskDetail(c *gin.Context) { + username := c.GetString("username") + if username == "" { + username = "admin" // Default fallback + } + + taskID := c.Param("id") + taskDetail, err := as.GetMaintenanceTaskDetail(taskID) + if err != nil { + c.HTML(http.StatusNotFound, "error.html", gin.H{ + "error": "Task not found", + "details": err.Error(), + }) + return + } + + // Prepare data for template + data := gin.H{ + "username": username, + "task": taskDetail.Task, + "taskDetail": taskDetail, + "title": fmt.Sprintf("Task Detail - %s", taskID), + } + + c.HTML(http.StatusOK, "task_detail.html", data) +} + // CancelMaintenanceTask cancels a pending maintenance task func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) { taskID := c.Param("id") @@ -1041,27 +1082,65 @@ func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, erro // getMaintenanceTasks returns all maintenance tasks func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) { if as.maintenanceManager == nil { - return []*MaintenanceTask{}, nil + return []*maintenance.MaintenanceTask{}, nil + } + + // Collect all tasks from memory across all statuses + allTasks := []*maintenance.MaintenanceTask{} + statuses := []maintenance.MaintenanceTaskStatus{ + maintenance.TaskStatusPending, + maintenance.TaskStatusAssigned, + maintenance.TaskStatusInProgress, + maintenance.TaskStatusCompleted, + maintenance.TaskStatusFailed, + maintenance.TaskStatusCancelled, + } + + for _, status := range statuses { + tasks := as.maintenanceManager.GetTasks(status, "", 0) + allTasks = append(allTasks, tasks...) + } + + // Also load any persisted tasks that might not be in memory + if as.configPersistence != nil { + persistedTasks, err := as.configPersistence.LoadAllTaskStates() + if err == nil { + // Add any persisted tasks not already in memory + for _, persistedTask := range persistedTasks { + found := false + for _, memoryTask := range allTasks { + if memoryTask.ID == persistedTask.ID { + found = true + break + } + } + if !found { + allTasks = append(allTasks, persistedTask) + } + } + } } - return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil + + return allTasks, nil } // getMaintenanceTask returns a specific maintenance task -func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) { +func (as *AdminServer) getMaintenanceTask(taskID string) (*maintenance.MaintenanceTask, error) { if as.maintenanceManager == nil { return nil, fmt.Errorf("maintenance manager not initialized") } // Search for the task across all statuses since we don't know which status it has - statuses := []MaintenanceTaskStatus{ - TaskStatusPending, - TaskStatusAssigned, - TaskStatusInProgress, - TaskStatusCompleted, - TaskStatusFailed, - TaskStatusCancelled, + statuses := []maintenance.MaintenanceTaskStatus{ + maintenance.TaskStatusPending, + maintenance.TaskStatusAssigned, + maintenance.TaskStatusInProgress, + maintenance.TaskStatusCompleted, + maintenance.TaskStatusFailed, + maintenance.TaskStatusCancelled, } + // First, search for the task in memory across all statuses for _, status := range statuses { tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status for _, task := range tasks { @@ -1071,9 +1150,133 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro } } + // If not found in memory, try to load from persistent storage + if as.configPersistence != nil { + task, err := as.configPersistence.LoadTaskState(taskID) + if err == nil { + glog.V(2).Infof("Loaded task %s from persistent storage", taskID) + return task, nil + } + glog.V(2).Infof("Task %s not found in persistent storage: %v", taskID, err) + } + return nil, fmt.Errorf("task %s not found", taskID) } +// GetMaintenanceTaskDetail returns comprehensive task details including logs and assignment history +func (as *AdminServer) GetMaintenanceTaskDetail(taskID string) (*maintenance.TaskDetailData, error) { + // Get basic task information + task, err := as.getMaintenanceTask(taskID) + if err != nil { + return nil, err + } + + // Create task detail structure from the loaded task + taskDetail := &maintenance.TaskDetailData{ + Task: task, + AssignmentHistory: task.AssignmentHistory, // Use assignment history from persisted task + ExecutionLogs: []*maintenance.TaskExecutionLog{}, + RelatedTasks: []*maintenance.MaintenanceTask{}, + LastUpdated: time.Now(), + } + + if taskDetail.AssignmentHistory == nil { + taskDetail.AssignmentHistory = []*maintenance.TaskAssignmentRecord{} + } + + // Get worker information if task is assigned + if task.WorkerID != "" { + workers := as.maintenanceManager.GetWorkers() + for _, worker := range workers { + if worker.ID == task.WorkerID { + taskDetail.WorkerInfo = worker + break + } + } + } + + // Get execution logs from worker if task is active/completed and worker is connected + if task.Status == maintenance.TaskStatusInProgress || task.Status == maintenance.TaskStatusCompleted { + if as.workerGrpcServer != nil && task.WorkerID != "" { + workerLogs, err := as.workerGrpcServer.RequestTaskLogs(task.WorkerID, taskID, 100, "") + if err == nil && len(workerLogs) > 0 { + // Convert worker logs to maintenance logs + for _, workerLog := range workerLogs { + maintenanceLog := &maintenance.TaskExecutionLog{ + Timestamp: time.Unix(workerLog.Timestamp, 0), + Level: workerLog.Level, + Message: workerLog.Message, + Source: "worker", + TaskID: taskID, + WorkerID: task.WorkerID, + } + // carry structured fields if present + if len(workerLog.Fields) > 0 { + maintenanceLog.Fields = make(map[string]string, len(workerLog.Fields)) + for k, v := range workerLog.Fields { + maintenanceLog.Fields[k] = v + } + } + // carry optional progress/status + if workerLog.Progress != 0 { + p := float64(workerLog.Progress) + maintenanceLog.Progress = &p + } + if workerLog.Status != "" { + maintenanceLog.Status = workerLog.Status + } + taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, maintenanceLog) + } + } else if err != nil { + // Add a diagnostic log entry when worker logs cannot be retrieved + diagnosticLog := &maintenance.TaskExecutionLog{ + Timestamp: time.Now(), + Level: "WARNING", + Message: fmt.Sprintf("Failed to retrieve worker logs: %v", err), + Source: "admin", + TaskID: taskID, + WorkerID: task.WorkerID, + } + taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog) + glog.V(1).Infof("Failed to get worker logs for task %s from worker %s: %v", taskID, task.WorkerID, err) + } + } else { + // Add diagnostic information when worker is not available + reason := "worker gRPC server not available" + if task.WorkerID == "" { + reason = "no worker assigned to task" + } + diagnosticLog := &maintenance.TaskExecutionLog{ + Timestamp: time.Now(), + Level: "INFO", + Message: fmt.Sprintf("Worker logs not available: %s", reason), + Source: "admin", + TaskID: taskID, + WorkerID: task.WorkerID, + } + taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog) + } + } + + // Get related tasks (other tasks on same volume/server) + if task.VolumeID != 0 || task.Server != "" { + allTasks := as.maintenanceManager.GetTasks("", "", 50) // Get recent tasks + for _, relatedTask := range allTasks { + if relatedTask.ID != taskID && + (relatedTask.VolumeID == task.VolumeID || relatedTask.Server == task.Server) { + taskDetail.RelatedTasks = append(taskDetail.RelatedTasks, relatedTask) + } + } + } + + // Save updated task detail to disk + if err := as.configPersistence.SaveTaskDetail(taskID, taskDetail); err != nil { + glog.V(1).Infof("Failed to save task detail for %s: %v", taskID, err) + } + + return taskDetail, nil +} + // getMaintenanceWorkers returns all maintenance workers func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) { if as.maintenanceManager == nil { @@ -1157,6 +1360,34 @@ func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDeta }, nil } +// GetWorkerLogs fetches logs from a specific worker for a task +func (as *AdminServer) GetWorkerLogs(c *gin.Context) { + workerID := c.Param("id") + taskID := c.Query("taskId") + maxEntriesStr := c.DefaultQuery("maxEntries", "100") + logLevel := c.DefaultQuery("logLevel", "") + + maxEntries := int32(100) + if maxEntriesStr != "" { + if parsed, err := strconv.ParseInt(maxEntriesStr, 10, 32); err == nil { + maxEntries = int32(parsed) + } + } + + if as.workerGrpcServer == nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Worker gRPC server not available"}) + return + } + + logs, err := as.workerGrpcServer.RequestTaskLogs(workerID, taskID, maxEntries, logLevel) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("Failed to get logs from worker: %v", err)}) + return + } + + c.JSON(http.StatusOK, gin.H{"worker_id": workerID, "task_id": taskID, "logs": logs, "count": len(logs)}) +} + // getMaintenanceStats returns maintenance statistics func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) { if as.maintenanceManager == nil { @@ -1376,6 +1607,20 @@ func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer { // InitMaintenanceManager initializes the maintenance manager func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) { s.maintenanceManager = maintenance.NewMaintenanceManager(s, config) + + // Set up task persistence if config persistence is available + if s.configPersistence != nil { + queue := s.maintenanceManager.GetQueue() + if queue != nil { + queue.SetPersistence(s.configPersistence) + + // Load tasks from persistence on startup + if err := queue.LoadTasksFromPersistence(); err != nil { + glog.Errorf("Failed to load tasks from persistence: %v", err) + } + } + } + glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled) } |
