diff options
Diffstat (limited to 'weed/admin/dash/config_persistence.go')
| -rw-r--r-- | weed/admin/dash/config_persistence.go | 545 |
1 files changed, 545 insertions, 0 deletions
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index b6b3074ab..1fe1a9b42 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -1,11 +1,15 @@ package dash import ( + "encoding/json" "fmt" "os" "path/filepath" + "sort" + "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" @@ -33,6 +37,12 @@ const ( BalanceTaskConfigJSONFile = "task_balance.json" ReplicationTaskConfigJSONFile = "task_replication.json" + // Task persistence subdirectories and settings + TasksSubdir = "tasks" + TaskDetailsSubdir = "task_details" + TaskLogsSubdir = "task_logs" + MaxCompletedTasks = 10 // Only keep last 10 completed tasks + ConfigDirPermissions = 0755 ConfigFilePermissions = 0644 ) @@ -45,6 +55,35 @@ type ( ReplicationTaskConfig = worker_pb.ReplicationTaskConfig ) +// isValidTaskID validates that a task ID is safe for use in file paths +// This prevents path traversal attacks by ensuring the task ID doesn't contain +// path separators or parent directory references +func isValidTaskID(taskID string) bool { + if taskID == "" { + return false + } + + // Reject task IDs with leading or trailing whitespace + if strings.TrimSpace(taskID) != taskID { + return false + } + + // Check for path traversal patterns + if strings.Contains(taskID, "/") || + strings.Contains(taskID, "\\") || + strings.Contains(taskID, "..") || + strings.Contains(taskID, ":") { + return false + } + + // Additional safety: ensure it's not just dots or empty after trim + if taskID == "." || taskID == ".." { + return false + } + + return true +} + // ConfigPersistence handles saving and loading configuration files type ConfigPersistence struct { dataDir string @@ -688,3 +727,509 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) return policy } + +// SaveTaskDetail saves detailed task information to disk +func (cp *ConfigPersistence) SaveTaskDetail(taskID string, detail *maintenance.TaskDetailData) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task detail") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir) + if err := os.MkdirAll(taskDetailDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create task details directory: %w", err) + } + + // Save task detail as JSON for easy reading and debugging + taskDetailPath := filepath.Join(taskDetailDir, fmt.Sprintf("%s.json", taskID)) + jsonData, err := json.MarshalIndent(detail, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal task detail to JSON: %w", err) + } + + if err := os.WriteFile(taskDetailPath, jsonData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task detail file: %w", err) + } + + glog.V(2).Infof("Saved task detail for task %s to %s", taskID, taskDetailPath) + return nil +} + +// LoadTaskDetail loads detailed task information from disk +func (cp *ConfigPersistence) LoadTaskDetail(taskID string) (*maintenance.TaskDetailData, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot load task detail") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID)) + if _, err := os.Stat(taskDetailPath); os.IsNotExist(err) { + return nil, fmt.Errorf("task detail file not found: %s", taskID) + } + + jsonData, err := os.ReadFile(taskDetailPath) + if err != nil { + return nil, fmt.Errorf("failed to read task detail file: %w", err) + } + + var detail maintenance.TaskDetailData + if err := json.Unmarshal(jsonData, &detail); err != nil { + return nil, fmt.Errorf("failed to unmarshal task detail JSON: %w", err) + } + + glog.V(2).Infof("Loaded task detail for task %s from %s", taskID, taskDetailPath) + return &detail, nil +} + +// SaveTaskExecutionLogs saves execution logs for a task +func (cp *ConfigPersistence) SaveTaskExecutionLogs(taskID string, logs []*maintenance.TaskExecutionLog) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task logs") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskLogsDir := filepath.Join(cp.dataDir, TaskLogsSubdir) + if err := os.MkdirAll(taskLogsDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create task logs directory: %w", err) + } + + // Save logs as JSON for easy reading + taskLogsPath := filepath.Join(taskLogsDir, fmt.Sprintf("%s.json", taskID)) + logsData := struct { + TaskID string `json:"task_id"` + Logs []*maintenance.TaskExecutionLog `json:"logs"` + }{ + TaskID: taskID, + Logs: logs, + } + jsonData, err := json.MarshalIndent(logsData, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal task logs to JSON: %w", err) + } + + if err := os.WriteFile(taskLogsPath, jsonData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task logs file: %w", err) + } + + glog.V(2).Infof("Saved %d execution logs for task %s to %s", len(logs), taskID, taskLogsPath) + return nil +} + +// LoadTaskExecutionLogs loads execution logs for a task +func (cp *ConfigPersistence) LoadTaskExecutionLogs(taskID string) ([]*maintenance.TaskExecutionLog, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot load task logs") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID)) + if _, err := os.Stat(taskLogsPath); os.IsNotExist(err) { + // Return empty slice if logs don't exist yet + return []*maintenance.TaskExecutionLog{}, nil + } + + jsonData, err := os.ReadFile(taskLogsPath) + if err != nil { + return nil, fmt.Errorf("failed to read task logs file: %w", err) + } + + var logsData struct { + TaskID string `json:"task_id"` + Logs []*maintenance.TaskExecutionLog `json:"logs"` + } + if err := json.Unmarshal(jsonData, &logsData); err != nil { + return nil, fmt.Errorf("failed to unmarshal task logs JSON: %w", err) + } + + glog.V(2).Infof("Loaded %d execution logs for task %s from %s", len(logsData.Logs), taskID, taskLogsPath) + return logsData.Logs, nil +} + +// DeleteTaskDetail removes task detail and logs from disk +func (cp *ConfigPersistence) DeleteTaskDetail(taskID string) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot delete task detail") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + // Delete task detail file + taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID)) + if err := os.Remove(taskDetailPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete task detail file: %w", err) + } + + // Delete task logs file + taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID)) + if err := os.Remove(taskLogsPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete task logs file: %w", err) + } + + glog.V(2).Infof("Deleted task detail and logs for task %s", taskID) + return nil +} + +// ListTaskDetails returns a list of all task IDs that have stored details +func (cp *ConfigPersistence) ListTaskDetails() ([]string, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot list task details") + } + + taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir) + if _, err := os.Stat(taskDetailDir); os.IsNotExist(err) { + return []string{}, nil + } + + entries, err := os.ReadDir(taskDetailDir) + if err != nil { + return nil, fmt.Errorf("failed to read task details directory: %w", err) + } + + var taskIDs []string + for _, entry := range entries { + if !entry.IsDir() && filepath.Ext(entry.Name()) == ".json" { + taskID := entry.Name()[:len(entry.Name())-5] // Remove .json extension + taskIDs = append(taskIDs, taskID) + } + } + + return taskIDs, nil +} + +// CleanupCompletedTasks removes old completed tasks beyond the retention limit +func (cp *ConfigPersistence) CleanupCompletedTasks() error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot cleanup completed tasks") + } + + tasksDir := filepath.Join(cp.dataDir, TasksSubdir) + if _, err := os.Stat(tasksDir); os.IsNotExist(err) { + return nil // No tasks directory, nothing to cleanup + } + + // Load all tasks and find completed/failed ones + allTasks, err := cp.LoadAllTaskStates() + if err != nil { + return fmt.Errorf("failed to load tasks for cleanup: %w", err) + } + + // Filter completed and failed tasks, sort by completion time + var completedTasks []*maintenance.MaintenanceTask + for _, task := range allTasks { + if (task.Status == maintenance.TaskStatusCompleted || task.Status == maintenance.TaskStatusFailed) && task.CompletedAt != nil { + completedTasks = append(completedTasks, task) + } + } + + // Sort by completion time (most recent first) + sort.Slice(completedTasks, func(i, j int) bool { + return completedTasks[i].CompletedAt.After(*completedTasks[j].CompletedAt) + }) + + // Keep only the most recent MaxCompletedTasks, delete the rest + if len(completedTasks) > MaxCompletedTasks { + tasksToDelete := completedTasks[MaxCompletedTasks:] + for _, task := range tasksToDelete { + if err := cp.DeleteTaskState(task.ID); err != nil { + glog.Warningf("Failed to delete old completed task %s: %v", task.ID, err) + } else { + glog.V(2).Infof("Cleaned up old completed task %s (completed: %v)", task.ID, task.CompletedAt) + } + } + glog.V(1).Infof("Cleaned up %d old completed tasks (keeping %d most recent)", len(tasksToDelete), MaxCompletedTasks) + } + + return nil +} + +// SaveTaskState saves a task state to protobuf file +func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task state") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(task.ID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", task.ID) + } + + tasksDir := filepath.Join(cp.dataDir, TasksSubdir) + if err := os.MkdirAll(tasksDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create tasks directory: %w", err) + } + + taskFilePath := filepath.Join(tasksDir, fmt.Sprintf("%s.pb", task.ID)) + + // Convert task to protobuf + pbTask := cp.maintenanceTaskToProtobuf(task) + taskStateFile := &worker_pb.TaskStateFile{ + Task: pbTask, + LastUpdated: time.Now().Unix(), + AdminVersion: "unknown", // TODO: add version info + } + + pbData, err := proto.Marshal(taskStateFile) + if err != nil { + return fmt.Errorf("failed to marshal task state protobuf: %w", err) + } + + if err := os.WriteFile(taskFilePath, pbData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task state file: %w", err) + } + + glog.V(2).Infof("Saved task state for task %s to %s", task.ID, taskFilePath) + return nil +} + +// LoadTaskState loads a task state from protobuf file +func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.MaintenanceTask, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified, cannot load task state") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID)) + if _, err := os.Stat(taskFilePath); os.IsNotExist(err) { + return nil, fmt.Errorf("task state file not found: %s", taskID) + } + + pbData, err := os.ReadFile(taskFilePath) + if err != nil { + return nil, fmt.Errorf("failed to read task state file: %w", err) + } + + var taskStateFile worker_pb.TaskStateFile + if err := proto.Unmarshal(pbData, &taskStateFile); err != nil { + return nil, fmt.Errorf("failed to unmarshal task state protobuf: %w", err) + } + + // Convert protobuf to maintenance task + task := cp.protobufToMaintenanceTask(taskStateFile.Task) + + glog.V(2).Infof("Loaded task state for task %s from %s", taskID, taskFilePath) + return task, nil +} + +// LoadAllTaskStates loads all task states from disk +func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask, error) { + if cp.dataDir == "" { + return []*maintenance.MaintenanceTask{}, nil + } + + tasksDir := filepath.Join(cp.dataDir, TasksSubdir) + if _, err := os.Stat(tasksDir); os.IsNotExist(err) { + return []*maintenance.MaintenanceTask{}, nil + } + + entries, err := os.ReadDir(tasksDir) + if err != nil { + return nil, fmt.Errorf("failed to read tasks directory: %w", err) + } + + var tasks []*maintenance.MaintenanceTask + for _, entry := range entries { + if !entry.IsDir() && filepath.Ext(entry.Name()) == ".pb" { + taskID := entry.Name()[:len(entry.Name())-3] // Remove .pb extension + task, err := cp.LoadTaskState(taskID) + if err != nil { + glog.Warningf("Failed to load task state for %s: %v", taskID, err) + continue + } + tasks = append(tasks, task) + } + } + + glog.V(1).Infof("Loaded %d task states from disk", len(tasks)) + return tasks, nil +} + +// DeleteTaskState removes a task state file from disk +func (cp *ConfigPersistence) DeleteTaskState(taskID string) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot delete task state") + } + + // Validate task ID to prevent path traversal + if !isValidTaskID(taskID) { + return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID) + } + + taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID)) + if err := os.Remove(taskFilePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete task state file: %w", err) + } + + glog.V(2).Infof("Deleted task state for task %s", taskID) + return nil +} + +// maintenanceTaskToProtobuf converts a MaintenanceTask to protobuf format +func (cp *ConfigPersistence) maintenanceTaskToProtobuf(task *maintenance.MaintenanceTask) *worker_pb.MaintenanceTaskData { + pbTask := &worker_pb.MaintenanceTaskData{ + Id: task.ID, + Type: string(task.Type), + Priority: cp.priorityToString(task.Priority), + Status: string(task.Status), + VolumeId: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + Reason: task.Reason, + CreatedAt: task.CreatedAt.Unix(), + ScheduledAt: task.ScheduledAt.Unix(), + WorkerId: task.WorkerID, + Error: task.Error, + Progress: task.Progress, + RetryCount: int32(task.RetryCount), + MaxRetries: int32(task.MaxRetries), + CreatedBy: task.CreatedBy, + CreationContext: task.CreationContext, + DetailedReason: task.DetailedReason, + Tags: task.Tags, + } + + // Handle optional timestamps + if task.StartedAt != nil { + pbTask.StartedAt = task.StartedAt.Unix() + } + if task.CompletedAt != nil { + pbTask.CompletedAt = task.CompletedAt.Unix() + } + + // Convert assignment history + if task.AssignmentHistory != nil { + for _, record := range task.AssignmentHistory { + pbRecord := &worker_pb.TaskAssignmentRecord{ + WorkerId: record.WorkerID, + WorkerAddress: record.WorkerAddress, + AssignedAt: record.AssignedAt.Unix(), + Reason: record.Reason, + } + if record.UnassignedAt != nil { + pbRecord.UnassignedAt = record.UnassignedAt.Unix() + } + pbTask.AssignmentHistory = append(pbTask.AssignmentHistory, pbRecord) + } + } + + // Convert typed parameters if available + if task.TypedParams != nil { + pbTask.TypedParams = task.TypedParams + } + + return pbTask +} + +// protobufToMaintenanceTask converts protobuf format to MaintenanceTask +func (cp *ConfigPersistence) protobufToMaintenanceTask(pbTask *worker_pb.MaintenanceTaskData) *maintenance.MaintenanceTask { + task := &maintenance.MaintenanceTask{ + ID: pbTask.Id, + Type: maintenance.MaintenanceTaskType(pbTask.Type), + Priority: cp.stringToPriority(pbTask.Priority), + Status: maintenance.MaintenanceTaskStatus(pbTask.Status), + VolumeID: pbTask.VolumeId, + Server: pbTask.Server, + Collection: pbTask.Collection, + Reason: pbTask.Reason, + CreatedAt: time.Unix(pbTask.CreatedAt, 0), + ScheduledAt: time.Unix(pbTask.ScheduledAt, 0), + WorkerID: pbTask.WorkerId, + Error: pbTask.Error, + Progress: pbTask.Progress, + RetryCount: int(pbTask.RetryCount), + MaxRetries: int(pbTask.MaxRetries), + CreatedBy: pbTask.CreatedBy, + CreationContext: pbTask.CreationContext, + DetailedReason: pbTask.DetailedReason, + Tags: pbTask.Tags, + } + + // Handle optional timestamps + if pbTask.StartedAt > 0 { + startTime := time.Unix(pbTask.StartedAt, 0) + task.StartedAt = &startTime + } + if pbTask.CompletedAt > 0 { + completedTime := time.Unix(pbTask.CompletedAt, 0) + task.CompletedAt = &completedTime + } + + // Convert assignment history + if pbTask.AssignmentHistory != nil { + task.AssignmentHistory = make([]*maintenance.TaskAssignmentRecord, 0, len(pbTask.AssignmentHistory)) + for _, pbRecord := range pbTask.AssignmentHistory { + record := &maintenance.TaskAssignmentRecord{ + WorkerID: pbRecord.WorkerId, + WorkerAddress: pbRecord.WorkerAddress, + AssignedAt: time.Unix(pbRecord.AssignedAt, 0), + Reason: pbRecord.Reason, + } + if pbRecord.UnassignedAt > 0 { + unassignedTime := time.Unix(pbRecord.UnassignedAt, 0) + record.UnassignedAt = &unassignedTime + } + task.AssignmentHistory = append(task.AssignmentHistory, record) + } + } + + // Convert typed parameters if available + if pbTask.TypedParams != nil { + task.TypedParams = pbTask.TypedParams + } + + return task +} + +// priorityToString converts MaintenanceTaskPriority to string for protobuf storage +func (cp *ConfigPersistence) priorityToString(priority maintenance.MaintenanceTaskPriority) string { + switch priority { + case maintenance.PriorityLow: + return "low" + case maintenance.PriorityNormal: + return "normal" + case maintenance.PriorityHigh: + return "high" + case maintenance.PriorityCritical: + return "critical" + default: + return "normal" + } +} + +// stringToPriority converts string from protobuf to MaintenanceTaskPriority +func (cp *ConfigPersistence) stringToPriority(priorityStr string) maintenance.MaintenanceTaskPriority { + switch priorityStr { + case "low": + return maintenance.PriorityLow + case "normal": + return maintenance.PriorityNormal + case "high": + return maintenance.PriorityHigh + case "critical": + return maintenance.PriorityCritical + default: + return maintenance.PriorityNormal + } +} |
