aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/admin_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/admin_server.go')
-rw-r--r--weed/admin/dash/admin_server.go265
1 files changed, 255 insertions, 10 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go
index 376f3edc7..3f135ee1b 100644
--- a/weed/admin/dash/admin_server.go
+++ b/weed/admin/dash/admin_server.go
@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"net/http"
+ "strconv"
"time"
"github.com/gin-gonic/gin"
@@ -878,6 +879,46 @@ func (as *AdminServer) GetMaintenanceTask(c *gin.Context) {
c.JSON(http.StatusOK, task)
}
+// GetMaintenanceTaskDetailAPI returns detailed task information via API
+func (as *AdminServer) GetMaintenanceTaskDetailAPI(c *gin.Context) {
+ taskID := c.Param("id")
+ taskDetail, err := as.GetMaintenanceTaskDetail(taskID)
+ if err != nil {
+ c.JSON(http.StatusNotFound, gin.H{"error": "Task detail not found", "details": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, taskDetail)
+}
+
+// ShowMaintenanceTaskDetail renders the task detail page
+func (as *AdminServer) ShowMaintenanceTaskDetail(c *gin.Context) {
+ username := c.GetString("username")
+ if username == "" {
+ username = "admin" // Default fallback
+ }
+
+ taskID := c.Param("id")
+ taskDetail, err := as.GetMaintenanceTaskDetail(taskID)
+ if err != nil {
+ c.HTML(http.StatusNotFound, "error.html", gin.H{
+ "error": "Task not found",
+ "details": err.Error(),
+ })
+ return
+ }
+
+ // Prepare data for template
+ data := gin.H{
+ "username": username,
+ "task": taskDetail.Task,
+ "taskDetail": taskDetail,
+ "title": fmt.Sprintf("Task Detail - %s", taskID),
+ }
+
+ c.HTML(http.StatusOK, "task_detail.html", data)
+}
+
// CancelMaintenanceTask cancels a pending maintenance task
func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
taskID := c.Param("id")
@@ -1041,27 +1082,65 @@ func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, erro
// getMaintenanceTasks returns all maintenance tasks
func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) {
if as.maintenanceManager == nil {
- return []*MaintenanceTask{}, nil
+ return []*maintenance.MaintenanceTask{}, nil
+ }
+
+ // Collect all tasks from memory across all statuses
+ allTasks := []*maintenance.MaintenanceTask{}
+ statuses := []maintenance.MaintenanceTaskStatus{
+ maintenance.TaskStatusPending,
+ maintenance.TaskStatusAssigned,
+ maintenance.TaskStatusInProgress,
+ maintenance.TaskStatusCompleted,
+ maintenance.TaskStatusFailed,
+ maintenance.TaskStatusCancelled,
+ }
+
+ for _, status := range statuses {
+ tasks := as.maintenanceManager.GetTasks(status, "", 0)
+ allTasks = append(allTasks, tasks...)
+ }
+
+ // Also load any persisted tasks that might not be in memory
+ if as.configPersistence != nil {
+ persistedTasks, err := as.configPersistence.LoadAllTaskStates()
+ if err == nil {
+ // Add any persisted tasks not already in memory
+ for _, persistedTask := range persistedTasks {
+ found := false
+ for _, memoryTask := range allTasks {
+ if memoryTask.ID == persistedTask.ID {
+ found = true
+ break
+ }
+ }
+ if !found {
+ allTasks = append(allTasks, persistedTask)
+ }
+ }
+ }
}
- return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil
+
+ return allTasks, nil
}
// getMaintenanceTask returns a specific maintenance task
-func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) {
+func (as *AdminServer) getMaintenanceTask(taskID string) (*maintenance.MaintenanceTask, error) {
if as.maintenanceManager == nil {
return nil, fmt.Errorf("maintenance manager not initialized")
}
// Search for the task across all statuses since we don't know which status it has
- statuses := []MaintenanceTaskStatus{
- TaskStatusPending,
- TaskStatusAssigned,
- TaskStatusInProgress,
- TaskStatusCompleted,
- TaskStatusFailed,
- TaskStatusCancelled,
+ statuses := []maintenance.MaintenanceTaskStatus{
+ maintenance.TaskStatusPending,
+ maintenance.TaskStatusAssigned,
+ maintenance.TaskStatusInProgress,
+ maintenance.TaskStatusCompleted,
+ maintenance.TaskStatusFailed,
+ maintenance.TaskStatusCancelled,
}
+ // First, search for the task in memory across all statuses
for _, status := range statuses {
tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status
for _, task := range tasks {
@@ -1071,9 +1150,133 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro
}
}
+ // If not found in memory, try to load from persistent storage
+ if as.configPersistence != nil {
+ task, err := as.configPersistence.LoadTaskState(taskID)
+ if err == nil {
+ glog.V(2).Infof("Loaded task %s from persistent storage", taskID)
+ return task, nil
+ }
+ glog.V(2).Infof("Task %s not found in persistent storage: %v", taskID, err)
+ }
+
return nil, fmt.Errorf("task %s not found", taskID)
}
+// GetMaintenanceTaskDetail returns comprehensive task details including logs and assignment history
+func (as *AdminServer) GetMaintenanceTaskDetail(taskID string) (*maintenance.TaskDetailData, error) {
+ // Get basic task information
+ task, err := as.getMaintenanceTask(taskID)
+ if err != nil {
+ return nil, err
+ }
+
+ // Create task detail structure from the loaded task
+ taskDetail := &maintenance.TaskDetailData{
+ Task: task,
+ AssignmentHistory: task.AssignmentHistory, // Use assignment history from persisted task
+ ExecutionLogs: []*maintenance.TaskExecutionLog{},
+ RelatedTasks: []*maintenance.MaintenanceTask{},
+ LastUpdated: time.Now(),
+ }
+
+ if taskDetail.AssignmentHistory == nil {
+ taskDetail.AssignmentHistory = []*maintenance.TaskAssignmentRecord{}
+ }
+
+ // Get worker information if task is assigned
+ if task.WorkerID != "" {
+ workers := as.maintenanceManager.GetWorkers()
+ for _, worker := range workers {
+ if worker.ID == task.WorkerID {
+ taskDetail.WorkerInfo = worker
+ break
+ }
+ }
+ }
+
+ // Get execution logs from worker if task is active/completed and worker is connected
+ if task.Status == maintenance.TaskStatusInProgress || task.Status == maintenance.TaskStatusCompleted {
+ if as.workerGrpcServer != nil && task.WorkerID != "" {
+ workerLogs, err := as.workerGrpcServer.RequestTaskLogs(task.WorkerID, taskID, 100, "")
+ if err == nil && len(workerLogs) > 0 {
+ // Convert worker logs to maintenance logs
+ for _, workerLog := range workerLogs {
+ maintenanceLog := &maintenance.TaskExecutionLog{
+ Timestamp: time.Unix(workerLog.Timestamp, 0),
+ Level: workerLog.Level,
+ Message: workerLog.Message,
+ Source: "worker",
+ TaskID: taskID,
+ WorkerID: task.WorkerID,
+ }
+ // carry structured fields if present
+ if len(workerLog.Fields) > 0 {
+ maintenanceLog.Fields = make(map[string]string, len(workerLog.Fields))
+ for k, v := range workerLog.Fields {
+ maintenanceLog.Fields[k] = v
+ }
+ }
+ // carry optional progress/status
+ if workerLog.Progress != 0 {
+ p := float64(workerLog.Progress)
+ maintenanceLog.Progress = &p
+ }
+ if workerLog.Status != "" {
+ maintenanceLog.Status = workerLog.Status
+ }
+ taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, maintenanceLog)
+ }
+ } else if err != nil {
+ // Add a diagnostic log entry when worker logs cannot be retrieved
+ diagnosticLog := &maintenance.TaskExecutionLog{
+ Timestamp: time.Now(),
+ Level: "WARNING",
+ Message: fmt.Sprintf("Failed to retrieve worker logs: %v", err),
+ Source: "admin",
+ TaskID: taskID,
+ WorkerID: task.WorkerID,
+ }
+ taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
+ glog.V(1).Infof("Failed to get worker logs for task %s from worker %s: %v", taskID, task.WorkerID, err)
+ }
+ } else {
+ // Add diagnostic information when worker is not available
+ reason := "worker gRPC server not available"
+ if task.WorkerID == "" {
+ reason = "no worker assigned to task"
+ }
+ diagnosticLog := &maintenance.TaskExecutionLog{
+ Timestamp: time.Now(),
+ Level: "INFO",
+ Message: fmt.Sprintf("Worker logs not available: %s", reason),
+ Source: "admin",
+ TaskID: taskID,
+ WorkerID: task.WorkerID,
+ }
+ taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
+ }
+ }
+
+ // Get related tasks (other tasks on same volume/server)
+ if task.VolumeID != 0 || task.Server != "" {
+ allTasks := as.maintenanceManager.GetTasks("", "", 50) // Get recent tasks
+ for _, relatedTask := range allTasks {
+ if relatedTask.ID != taskID &&
+ (relatedTask.VolumeID == task.VolumeID || relatedTask.Server == task.Server) {
+ taskDetail.RelatedTasks = append(taskDetail.RelatedTasks, relatedTask)
+ }
+ }
+ }
+
+ // Save updated task detail to disk
+ if err := as.configPersistence.SaveTaskDetail(taskID, taskDetail); err != nil {
+ glog.V(1).Infof("Failed to save task detail for %s: %v", taskID, err)
+ }
+
+ return taskDetail, nil
+}
+
// getMaintenanceWorkers returns all maintenance workers
func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
if as.maintenanceManager == nil {
@@ -1157,6 +1360,34 @@ func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDeta
}, nil
}
+// GetWorkerLogs fetches logs from a specific worker for a task
+func (as *AdminServer) GetWorkerLogs(c *gin.Context) {
+ workerID := c.Param("id")
+ taskID := c.Query("taskId")
+ maxEntriesStr := c.DefaultQuery("maxEntries", "100")
+ logLevel := c.DefaultQuery("logLevel", "")
+
+ maxEntries := int32(100)
+ if maxEntriesStr != "" {
+ if parsed, err := strconv.ParseInt(maxEntriesStr, 10, 32); err == nil {
+ maxEntries = int32(parsed)
+ }
+ }
+
+ if as.workerGrpcServer == nil {
+ c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Worker gRPC server not available"})
+ return
+ }
+
+ logs, err := as.workerGrpcServer.RequestTaskLogs(workerID, taskID, maxEntries, logLevel)
+ if err != nil {
+ c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("Failed to get logs from worker: %v", err)})
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"worker_id": workerID, "task_id": taskID, "logs": logs, "count": len(logs)})
+}
+
// getMaintenanceStats returns maintenance statistics
func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) {
if as.maintenanceManager == nil {
@@ -1376,6 +1607,20 @@ func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
// InitMaintenanceManager initializes the maintenance manager
func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
+
+ // Set up task persistence if config persistence is available
+ if s.configPersistence != nil {
+ queue := s.maintenanceManager.GetQueue()
+ if queue != nil {
+ queue.SetPersistence(s.configPersistence)
+
+ // Load tasks from persistence on startup
+ if err := queue.LoadTasksFromPersistence(); err != nil {
+ glog.Errorf("Failed to load tasks from persistence: %v", err)
+ }
+ }
+ }
+
glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
}