aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/config_persistence.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/config_persistence.go')
-rw-r--r--weed/admin/dash/config_persistence.go545
1 files changed, 545 insertions, 0 deletions
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go
index b6b3074ab..1fe1a9b42 100644
--- a/weed/admin/dash/config_persistence.go
+++ b/weed/admin/dash/config_persistence.go
@@ -1,11 +1,15 @@
package dash
import (
+ "encoding/json"
"fmt"
"os"
"path/filepath"
+ "sort"
+ "strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
@@ -33,6 +37,12 @@ const (
BalanceTaskConfigJSONFile = "task_balance.json"
ReplicationTaskConfigJSONFile = "task_replication.json"
+ // Task persistence subdirectories and settings
+ TasksSubdir = "tasks"
+ TaskDetailsSubdir = "task_details"
+ TaskLogsSubdir = "task_logs"
+ MaxCompletedTasks = 10 // Only keep last 10 completed tasks
+
ConfigDirPermissions = 0755
ConfigFilePermissions = 0644
)
@@ -45,6 +55,35 @@ type (
ReplicationTaskConfig = worker_pb.ReplicationTaskConfig
)
+// isValidTaskID validates that a task ID is safe for use in file paths
+// This prevents path traversal attacks by ensuring the task ID doesn't contain
+// path separators or parent directory references
+func isValidTaskID(taskID string) bool {
+ if taskID == "" {
+ return false
+ }
+
+ // Reject task IDs with leading or trailing whitespace
+ if strings.TrimSpace(taskID) != taskID {
+ return false
+ }
+
+ // Check for path traversal patterns
+ if strings.Contains(taskID, "/") ||
+ strings.Contains(taskID, "\\") ||
+ strings.Contains(taskID, "..") ||
+ strings.Contains(taskID, ":") {
+ return false
+ }
+
+ // Additional safety: ensure it's not just dots or empty after trim
+ if taskID == "." || taskID == ".." {
+ return false
+ }
+
+ return true
+}
+
// ConfigPersistence handles saving and loading configuration files
type ConfigPersistence struct {
dataDir string
@@ -688,3 +727,509 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
return policy
}
+
+// SaveTaskDetail saves detailed task information to disk
+func (cp *ConfigPersistence) SaveTaskDetail(taskID string, detail *maintenance.TaskDetailData) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot save task detail")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(taskID) {
+ return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
+ }
+
+ taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir)
+ if err := os.MkdirAll(taskDetailDir, ConfigDirPermissions); err != nil {
+ return fmt.Errorf("failed to create task details directory: %w", err)
+ }
+
+ // Save task detail as JSON for easy reading and debugging
+ taskDetailPath := filepath.Join(taskDetailDir, fmt.Sprintf("%s.json", taskID))
+ jsonData, err := json.MarshalIndent(detail, "", " ")
+ if err != nil {
+ return fmt.Errorf("failed to marshal task detail to JSON: %w", err)
+ }
+
+ if err := os.WriteFile(taskDetailPath, jsonData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write task detail file: %w", err)
+ }
+
+ glog.V(2).Infof("Saved task detail for task %s to %s", taskID, taskDetailPath)
+ return nil
+}
+
+// LoadTaskDetail loads detailed task information from disk
+func (cp *ConfigPersistence) LoadTaskDetail(taskID string) (*maintenance.TaskDetailData, error) {
+ if cp.dataDir == "" {
+ return nil, fmt.Errorf("no data directory specified, cannot load task detail")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(taskID) {
+ return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
+ }
+
+ taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID))
+ if _, err := os.Stat(taskDetailPath); os.IsNotExist(err) {
+ return nil, fmt.Errorf("task detail file not found: %s", taskID)
+ }
+
+ jsonData, err := os.ReadFile(taskDetailPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read task detail file: %w", err)
+ }
+
+ var detail maintenance.TaskDetailData
+ if err := json.Unmarshal(jsonData, &detail); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal task detail JSON: %w", err)
+ }
+
+ glog.V(2).Infof("Loaded task detail for task %s from %s", taskID, taskDetailPath)
+ return &detail, nil
+}
+
+// SaveTaskExecutionLogs saves execution logs for a task
+func (cp *ConfigPersistence) SaveTaskExecutionLogs(taskID string, logs []*maintenance.TaskExecutionLog) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot save task logs")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(taskID) {
+ return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
+ }
+
+ taskLogsDir := filepath.Join(cp.dataDir, TaskLogsSubdir)
+ if err := os.MkdirAll(taskLogsDir, ConfigDirPermissions); err != nil {
+ return fmt.Errorf("failed to create task logs directory: %w", err)
+ }
+
+ // Save logs as JSON for easy reading
+ taskLogsPath := filepath.Join(taskLogsDir, fmt.Sprintf("%s.json", taskID))
+ logsData := struct {
+ TaskID string `json:"task_id"`
+ Logs []*maintenance.TaskExecutionLog `json:"logs"`
+ }{
+ TaskID: taskID,
+ Logs: logs,
+ }
+ jsonData, err := json.MarshalIndent(logsData, "", " ")
+ if err != nil {
+ return fmt.Errorf("failed to marshal task logs to JSON: %w", err)
+ }
+
+ if err := os.WriteFile(taskLogsPath, jsonData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write task logs file: %w", err)
+ }
+
+ glog.V(2).Infof("Saved %d execution logs for task %s to %s", len(logs), taskID, taskLogsPath)
+ return nil
+}
+
+// LoadTaskExecutionLogs loads execution logs for a task
+func (cp *ConfigPersistence) LoadTaskExecutionLogs(taskID string) ([]*maintenance.TaskExecutionLog, error) {
+ if cp.dataDir == "" {
+ return nil, fmt.Errorf("no data directory specified, cannot load task logs")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(taskID) {
+ return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
+ }
+
+ taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID))
+ if _, err := os.Stat(taskLogsPath); os.IsNotExist(err) {
+ // Return empty slice if logs don't exist yet
+ return []*maintenance.TaskExecutionLog{}, nil
+ }
+
+ jsonData, err := os.ReadFile(taskLogsPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read task logs file: %w", err)
+ }
+
+ var logsData struct {
+ TaskID string `json:"task_id"`
+ Logs []*maintenance.TaskExecutionLog `json:"logs"`
+ }
+ if err := json.Unmarshal(jsonData, &logsData); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal task logs JSON: %w", err)
+ }
+
+ glog.V(2).Infof("Loaded %d execution logs for task %s from %s", len(logsData.Logs), taskID, taskLogsPath)
+ return logsData.Logs, nil
+}
+
+// DeleteTaskDetail removes task detail and logs from disk
+func (cp *ConfigPersistence) DeleteTaskDetail(taskID string) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot delete task detail")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(taskID) {
+ return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
+ }
+
+ // Delete task detail file
+ taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID))
+ if err := os.Remove(taskDetailPath); err != nil && !os.IsNotExist(err) {
+ return fmt.Errorf("failed to delete task detail file: %w", err)
+ }
+
+ // Delete task logs file
+ taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID))
+ if err := os.Remove(taskLogsPath); err != nil && !os.IsNotExist(err) {
+ return fmt.Errorf("failed to delete task logs file: %w", err)
+ }
+
+ glog.V(2).Infof("Deleted task detail and logs for task %s", taskID)
+ return nil
+}
+
+// ListTaskDetails returns a list of all task IDs that have stored details
+func (cp *ConfigPersistence) ListTaskDetails() ([]string, error) {
+ if cp.dataDir == "" {
+ return nil, fmt.Errorf("no data directory specified, cannot list task details")
+ }
+
+ taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir)
+ if _, err := os.Stat(taskDetailDir); os.IsNotExist(err) {
+ return []string{}, nil
+ }
+
+ entries, err := os.ReadDir(taskDetailDir)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read task details directory: %w", err)
+ }
+
+ var taskIDs []string
+ for _, entry := range entries {
+ if !entry.IsDir() && filepath.Ext(entry.Name()) == ".json" {
+ taskID := entry.Name()[:len(entry.Name())-5] // Remove .json extension
+ taskIDs = append(taskIDs, taskID)
+ }
+ }
+
+ return taskIDs, nil
+}
+
+// CleanupCompletedTasks removes old completed tasks beyond the retention limit
+func (cp *ConfigPersistence) CleanupCompletedTasks() error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot cleanup completed tasks")
+ }
+
+ tasksDir := filepath.Join(cp.dataDir, TasksSubdir)
+ if _, err := os.Stat(tasksDir); os.IsNotExist(err) {
+ return nil // No tasks directory, nothing to cleanup
+ }
+
+ // Load all tasks and find completed/failed ones
+ allTasks, err := cp.LoadAllTaskStates()
+ if err != nil {
+ return fmt.Errorf("failed to load tasks for cleanup: %w", err)
+ }
+
+ // Filter completed and failed tasks, sort by completion time
+ var completedTasks []*maintenance.MaintenanceTask
+ for _, task := range allTasks {
+ if (task.Status == maintenance.TaskStatusCompleted || task.Status == maintenance.TaskStatusFailed) && task.CompletedAt != nil {
+ completedTasks = append(completedTasks, task)
+ }
+ }
+
+ // Sort by completion time (most recent first)
+ sort.Slice(completedTasks, func(i, j int) bool {
+ return completedTasks[i].CompletedAt.After(*completedTasks[j].CompletedAt)
+ })
+
+ // Keep only the most recent MaxCompletedTasks, delete the rest
+ if len(completedTasks) > MaxCompletedTasks {
+ tasksToDelete := completedTasks[MaxCompletedTasks:]
+ for _, task := range tasksToDelete {
+ if err := cp.DeleteTaskState(task.ID); err != nil {
+ glog.Warningf("Failed to delete old completed task %s: %v", task.ID, err)
+ } else {
+ glog.V(2).Infof("Cleaned up old completed task %s (completed: %v)", task.ID, task.CompletedAt)
+ }
+ }
+ glog.V(1).Infof("Cleaned up %d old completed tasks (keeping %d most recent)", len(tasksToDelete), MaxCompletedTasks)
+ }
+
+ return nil
+}
+
+// SaveTaskState saves a task state to protobuf file
+func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot save task state")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(task.ID) {
+ return fmt.Errorf("invalid task ID: %q contains illegal path characters", task.ID)
+ }
+
+ tasksDir := filepath.Join(cp.dataDir, TasksSubdir)
+ if err := os.MkdirAll(tasksDir, ConfigDirPermissions); err != nil {
+ return fmt.Errorf("failed to create tasks directory: %w", err)
+ }
+
+ taskFilePath := filepath.Join(tasksDir, fmt.Sprintf("%s.pb", task.ID))
+
+ // Convert task to protobuf
+ pbTask := cp.maintenanceTaskToProtobuf(task)
+ taskStateFile := &worker_pb.TaskStateFile{
+ Task: pbTask,
+ LastUpdated: time.Now().Unix(),
+ AdminVersion: "unknown", // TODO: add version info
+ }
+
+ pbData, err := proto.Marshal(taskStateFile)
+ if err != nil {
+ return fmt.Errorf("failed to marshal task state protobuf: %w", err)
+ }
+
+ if err := os.WriteFile(taskFilePath, pbData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write task state file: %w", err)
+ }
+
+ glog.V(2).Infof("Saved task state for task %s to %s", task.ID, taskFilePath)
+ return nil
+}
+
+// LoadTaskState loads a task state from protobuf file
+func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.MaintenanceTask, error) {
+ if cp.dataDir == "" {
+ return nil, fmt.Errorf("no data directory specified, cannot load task state")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(taskID) {
+ return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
+ }
+
+ taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID))
+ if _, err := os.Stat(taskFilePath); os.IsNotExist(err) {
+ return nil, fmt.Errorf("task state file not found: %s", taskID)
+ }
+
+ pbData, err := os.ReadFile(taskFilePath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read task state file: %w", err)
+ }
+
+ var taskStateFile worker_pb.TaskStateFile
+ if err := proto.Unmarshal(pbData, &taskStateFile); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal task state protobuf: %w", err)
+ }
+
+ // Convert protobuf to maintenance task
+ task := cp.protobufToMaintenanceTask(taskStateFile.Task)
+
+ glog.V(2).Infof("Loaded task state for task %s from %s", taskID, taskFilePath)
+ return task, nil
+}
+
+// LoadAllTaskStates loads all task states from disk
+func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask, error) {
+ if cp.dataDir == "" {
+ return []*maintenance.MaintenanceTask{}, nil
+ }
+
+ tasksDir := filepath.Join(cp.dataDir, TasksSubdir)
+ if _, err := os.Stat(tasksDir); os.IsNotExist(err) {
+ return []*maintenance.MaintenanceTask{}, nil
+ }
+
+ entries, err := os.ReadDir(tasksDir)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read tasks directory: %w", err)
+ }
+
+ var tasks []*maintenance.MaintenanceTask
+ for _, entry := range entries {
+ if !entry.IsDir() && filepath.Ext(entry.Name()) == ".pb" {
+ taskID := entry.Name()[:len(entry.Name())-3] // Remove .pb extension
+ task, err := cp.LoadTaskState(taskID)
+ if err != nil {
+ glog.Warningf("Failed to load task state for %s: %v", taskID, err)
+ continue
+ }
+ tasks = append(tasks, task)
+ }
+ }
+
+ glog.V(1).Infof("Loaded %d task states from disk", len(tasks))
+ return tasks, nil
+}
+
+// DeleteTaskState removes a task state file from disk
+func (cp *ConfigPersistence) DeleteTaskState(taskID string) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot delete task state")
+ }
+
+ // Validate task ID to prevent path traversal
+ if !isValidTaskID(taskID) {
+ return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
+ }
+
+ taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID))
+ if err := os.Remove(taskFilePath); err != nil && !os.IsNotExist(err) {
+ return fmt.Errorf("failed to delete task state file: %w", err)
+ }
+
+ glog.V(2).Infof("Deleted task state for task %s", taskID)
+ return nil
+}
+
+// maintenanceTaskToProtobuf converts a MaintenanceTask to protobuf format
+func (cp *ConfigPersistence) maintenanceTaskToProtobuf(task *maintenance.MaintenanceTask) *worker_pb.MaintenanceTaskData {
+ pbTask := &worker_pb.MaintenanceTaskData{
+ Id: task.ID,
+ Type: string(task.Type),
+ Priority: cp.priorityToString(task.Priority),
+ Status: string(task.Status),
+ VolumeId: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ Reason: task.Reason,
+ CreatedAt: task.CreatedAt.Unix(),
+ ScheduledAt: task.ScheduledAt.Unix(),
+ WorkerId: task.WorkerID,
+ Error: task.Error,
+ Progress: task.Progress,
+ RetryCount: int32(task.RetryCount),
+ MaxRetries: int32(task.MaxRetries),
+ CreatedBy: task.CreatedBy,
+ CreationContext: task.CreationContext,
+ DetailedReason: task.DetailedReason,
+ Tags: task.Tags,
+ }
+
+ // Handle optional timestamps
+ if task.StartedAt != nil {
+ pbTask.StartedAt = task.StartedAt.Unix()
+ }
+ if task.CompletedAt != nil {
+ pbTask.CompletedAt = task.CompletedAt.Unix()
+ }
+
+ // Convert assignment history
+ if task.AssignmentHistory != nil {
+ for _, record := range task.AssignmentHistory {
+ pbRecord := &worker_pb.TaskAssignmentRecord{
+ WorkerId: record.WorkerID,
+ WorkerAddress: record.WorkerAddress,
+ AssignedAt: record.AssignedAt.Unix(),
+ Reason: record.Reason,
+ }
+ if record.UnassignedAt != nil {
+ pbRecord.UnassignedAt = record.UnassignedAt.Unix()
+ }
+ pbTask.AssignmentHistory = append(pbTask.AssignmentHistory, pbRecord)
+ }
+ }
+
+ // Convert typed parameters if available
+ if task.TypedParams != nil {
+ pbTask.TypedParams = task.TypedParams
+ }
+
+ return pbTask
+}
+
+// protobufToMaintenanceTask converts protobuf format to MaintenanceTask
+func (cp *ConfigPersistence) protobufToMaintenanceTask(pbTask *worker_pb.MaintenanceTaskData) *maintenance.MaintenanceTask {
+ task := &maintenance.MaintenanceTask{
+ ID: pbTask.Id,
+ Type: maintenance.MaintenanceTaskType(pbTask.Type),
+ Priority: cp.stringToPriority(pbTask.Priority),
+ Status: maintenance.MaintenanceTaskStatus(pbTask.Status),
+ VolumeID: pbTask.VolumeId,
+ Server: pbTask.Server,
+ Collection: pbTask.Collection,
+ Reason: pbTask.Reason,
+ CreatedAt: time.Unix(pbTask.CreatedAt, 0),
+ ScheduledAt: time.Unix(pbTask.ScheduledAt, 0),
+ WorkerID: pbTask.WorkerId,
+ Error: pbTask.Error,
+ Progress: pbTask.Progress,
+ RetryCount: int(pbTask.RetryCount),
+ MaxRetries: int(pbTask.MaxRetries),
+ CreatedBy: pbTask.CreatedBy,
+ CreationContext: pbTask.CreationContext,
+ DetailedReason: pbTask.DetailedReason,
+ Tags: pbTask.Tags,
+ }
+
+ // Handle optional timestamps
+ if pbTask.StartedAt > 0 {
+ startTime := time.Unix(pbTask.StartedAt, 0)
+ task.StartedAt = &startTime
+ }
+ if pbTask.CompletedAt > 0 {
+ completedTime := time.Unix(pbTask.CompletedAt, 0)
+ task.CompletedAt = &completedTime
+ }
+
+ // Convert assignment history
+ if pbTask.AssignmentHistory != nil {
+ task.AssignmentHistory = make([]*maintenance.TaskAssignmentRecord, 0, len(pbTask.AssignmentHistory))
+ for _, pbRecord := range pbTask.AssignmentHistory {
+ record := &maintenance.TaskAssignmentRecord{
+ WorkerID: pbRecord.WorkerId,
+ WorkerAddress: pbRecord.WorkerAddress,
+ AssignedAt: time.Unix(pbRecord.AssignedAt, 0),
+ Reason: pbRecord.Reason,
+ }
+ if pbRecord.UnassignedAt > 0 {
+ unassignedTime := time.Unix(pbRecord.UnassignedAt, 0)
+ record.UnassignedAt = &unassignedTime
+ }
+ task.AssignmentHistory = append(task.AssignmentHistory, record)
+ }
+ }
+
+ // Convert typed parameters if available
+ if pbTask.TypedParams != nil {
+ task.TypedParams = pbTask.TypedParams
+ }
+
+ return task
+}
+
+// priorityToString converts MaintenanceTaskPriority to string for protobuf storage
+func (cp *ConfigPersistence) priorityToString(priority maintenance.MaintenanceTaskPriority) string {
+ switch priority {
+ case maintenance.PriorityLow:
+ return "low"
+ case maintenance.PriorityNormal:
+ return "normal"
+ case maintenance.PriorityHigh:
+ return "high"
+ case maintenance.PriorityCritical:
+ return "critical"
+ default:
+ return "normal"
+ }
+}
+
+// stringToPriority converts string from protobuf to MaintenanceTaskPriority
+func (cp *ConfigPersistence) stringToPriority(priorityStr string) maintenance.MaintenanceTaskPriority {
+ switch priorityStr {
+ case "low":
+ return maintenance.PriorityLow
+ case "normal":
+ return maintenance.PriorityNormal
+ case "high":
+ return maintenance.PriorityHigh
+ case "critical":
+ return maintenance.PriorityCritical
+ default:
+ return maintenance.PriorityNormal
+ }
+}