diff options
Diffstat (limited to 'weed/admin/dash/config_persistence.go')
| -rw-r--r-- | weed/admin/dash/config_persistence.go | 406 |
1 files changed, 40 insertions, 366 deletions
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index 1fe1a9b42..75a6a86f2 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -12,9 +12,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) @@ -24,18 +21,10 @@ const ( ConfigSubdir = "conf" // Configuration file names (protobuf binary) - MaintenanceConfigFile = "maintenance.pb" - VacuumTaskConfigFile = "task_vacuum.pb" - ECTaskConfigFile = "task_erasure_coding.pb" - BalanceTaskConfigFile = "task_balance.pb" - ReplicationTaskConfigFile = "task_replication.pb" + MaintenanceConfigFile = "maintenance.pb" // JSON reference files - MaintenanceConfigJSONFile = "maintenance.json" - VacuumTaskConfigJSONFile = "task_vacuum.json" - ECTaskConfigJSONFile = "task_erasure_coding.json" - BalanceTaskConfigJSONFile = "task_balance.json" - ReplicationTaskConfigJSONFile = "task_replication.json" + MaintenanceConfigJSONFile = "maintenance.json" // Task persistence subdirectories and settings TasksSubdir = "tasks" @@ -47,14 +36,6 @@ const ( ConfigFilePermissions = 0644 ) -// Task configuration types -type ( - VacuumTaskConfig = worker_pb.VacuumTaskConfig - ErasureCodingTaskConfig = worker_pb.ErasureCodingTaskConfig - BalanceTaskConfig = worker_pb.BalanceTaskConfig - ReplicationTaskConfig = worker_pb.ReplicationTaskConfig -) - // isValidTaskID validates that a task ID is safe for use in file paths // This prevents path traversal attacks by ensuring the task ID doesn't contain // path separators or parent directory references @@ -149,8 +130,6 @@ func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) if configData, err := os.ReadFile(configPath); err == nil { var config MaintenanceConfig if err := proto.Unmarshal(configData, &config); err == nil { - // Always populate policy from separate task configuration files - config.Policy = buildPolicyFromTaskConfigs() return &config, nil } } @@ -262,285 +241,6 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { return nil } -// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file -func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error { - return cp.saveTaskConfig(VacuumTaskConfigFile, config) -} - -// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file -func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error { - return cp.saveTaskConfig(VacuumTaskConfigFile, policy) -} - -// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file -func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) { - // Load as TaskPolicy and extract vacuum config - if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil { - if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil { - return vacuumConfig, nil - } - } - - // Return default config if no valid config found - return &VacuumTaskConfig{ - GarbageThreshold: 0.3, - MinVolumeAgeHours: 24, - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - }, nil -} - -// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file -func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) { - if cp.dataDir == "" { - // Return default policy if no data directory - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 2, - RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds - CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: 0.3, - MinVolumeAgeHours: 24, - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - }, - }, - }, nil - } - - confDir := filepath.Join(cp.dataDir, ConfigSubdir) - configPath := filepath.Join(confDir, VacuumTaskConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - // Return default policy if file doesn't exist - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 2, - RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds - CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: 0.3, - MinVolumeAgeHours: 24, - MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days - }, - }, - }, nil - } - - // Read file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read vacuum task config file: %w", err) - } - - // Try to unmarshal as TaskPolicy - var policy worker_pb.TaskPolicy - if err := proto.Unmarshal(configData, &policy); err == nil { - // Validate that it's actually a TaskPolicy with vacuum config - if policy.GetVacuumConfig() != nil { - glog.V(1).Infof("Loaded vacuum task policy from %s", configPath) - return &policy, nil - } - } - - return nil, fmt.Errorf("failed to unmarshal vacuum task configuration") -} - -// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file -func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error { - return cp.saveTaskConfig(ECTaskConfigFile, config) -} - -// SaveErasureCodingTaskPolicy saves complete EC task policy to protobuf file -func (cp *ConfigPersistence) SaveErasureCodingTaskPolicy(policy *worker_pb.TaskPolicy) error { - return cp.saveTaskConfig(ECTaskConfigFile, policy) -} - -// LoadErasureCodingTaskConfig loads EC task configuration from protobuf file -func (cp *ConfigPersistence) LoadErasureCodingTaskConfig() (*ErasureCodingTaskConfig, error) { - // Load as TaskPolicy and extract EC config - if taskPolicy, err := cp.LoadErasureCodingTaskPolicy(); err == nil && taskPolicy != nil { - if ecConfig := taskPolicy.GetErasureCodingConfig(); ecConfig != nil { - return ecConfig, nil - } - } - - // Return default config if no valid config found - return &ErasureCodingTaskConfig{ - FullnessRatio: 0.9, - QuietForSeconds: 3600, - MinVolumeSizeMb: 1024, - CollectionFilter: "", - }, nil -} - -// LoadErasureCodingTaskPolicy loads complete EC task policy from protobuf file -func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) { - if cp.dataDir == "" { - // Return default policy if no data directory - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 1, - RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds - CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ - ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ - FullnessRatio: 0.9, - QuietForSeconds: 3600, - MinVolumeSizeMb: 1024, - CollectionFilter: "", - }, - }, - }, nil - } - - confDir := filepath.Join(cp.dataDir, ConfigSubdir) - configPath := filepath.Join(confDir, ECTaskConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - // Return default policy if file doesn't exist - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 1, - RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds - CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ - ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ - FullnessRatio: 0.9, - QuietForSeconds: 3600, - MinVolumeSizeMb: 1024, - CollectionFilter: "", - }, - }, - }, nil - } - - // Read file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read EC task config file: %w", err) - } - - // Try to unmarshal as TaskPolicy - var policy worker_pb.TaskPolicy - if err := proto.Unmarshal(configData, &policy); err == nil { - // Validate that it's actually a TaskPolicy with EC config - if policy.GetErasureCodingConfig() != nil { - glog.V(1).Infof("Loaded EC task policy from %s", configPath) - return &policy, nil - } - } - - return nil, fmt.Errorf("failed to unmarshal EC task configuration") -} - -// SaveBalanceTaskConfig saves balance task configuration to protobuf file -func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error { - return cp.saveTaskConfig(BalanceTaskConfigFile, config) -} - -// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file -func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error { - return cp.saveTaskConfig(BalanceTaskConfigFile, policy) -} - -// LoadBalanceTaskConfig loads balance task configuration from protobuf file -func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) { - // Load as TaskPolicy and extract balance config - if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil { - if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil { - return balanceConfig, nil - } - } - - // Return default config if no valid config found - return &BalanceTaskConfig{ - ImbalanceThreshold: 0.1, - MinServerCount: 2, - }, nil -} - -// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file -func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) { - if cp.dataDir == "" { - // Return default policy if no data directory - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 1, - RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds - CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: 0.1, - MinServerCount: 2, - }, - }, - }, nil - } - - confDir := filepath.Join(cp.dataDir, ConfigSubdir) - configPath := filepath.Join(confDir, BalanceTaskConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - // Return default policy if file doesn't exist - return &worker_pb.TaskPolicy{ - Enabled: true, - MaxConcurrent: 1, - RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds - CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: 0.1, - MinServerCount: 2, - }, - }, - }, nil - } - - // Read file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read balance task config file: %w", err) - } - - // Try to unmarshal as TaskPolicy - var policy worker_pb.TaskPolicy - if err := proto.Unmarshal(configData, &policy); err == nil { - // Validate that it's actually a TaskPolicy with balance config - if policy.GetBalanceConfig() != nil { - glog.V(1).Infof("Loaded balance task policy from %s", configPath) - return &policy, nil - } - } - - return nil, fmt.Errorf("failed to unmarshal balance task configuration") -} - -// SaveReplicationTaskConfig saves replication task configuration to protobuf file -func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error { - return cp.saveTaskConfig(ReplicationTaskConfigFile, config) -} - -// LoadReplicationTaskConfig loads replication task configuration from protobuf file -func (cp *ConfigPersistence) LoadReplicationTaskConfig() (*ReplicationTaskConfig, error) { - var config ReplicationTaskConfig - err := cp.loadTaskConfig(ReplicationTaskConfigFile, &config) - if err != nil { - // Return default config if file doesn't exist - if os.IsNotExist(err) { - return &ReplicationTaskConfig{ - TargetReplicaCount: 1, - }, nil - } - return nil, err - } - return &config, nil -} - // saveTaskConfig is a generic helper for saving task configurations with both protobuf and JSON reference func (cp *ConfigPersistence) saveTaskConfig(filename string, config proto.Message) error { if cp.dataDir == "" { @@ -630,6 +330,44 @@ func (cp *ConfigPersistence) IsConfigured() bool { return cp.dataDir != "" } +// SaveTaskPolicyGeneric saves a task policy for any task type dynamically +func (cp *ConfigPersistence) SaveTaskPolicyGeneric(taskType string, policy *worker_pb.TaskPolicy) error { + filename := fmt.Sprintf("task_%s.pb", taskType) + return cp.saveTaskConfig(filename, policy) +} + +// LoadTaskPolicyGeneric loads a task policy for any task type dynamically +func (cp *ConfigPersistence) LoadTaskPolicyGeneric(taskType string) (*worker_pb.TaskPolicy, error) { + filename := fmt.Sprintf("task_%s.pb", taskType) + + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory configured") + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, filename) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + return nil, fmt.Errorf("no configuration found for task type: %s", taskType) + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read task config file: %w", err) + } + + // Unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err != nil { + return nil, fmt.Errorf("failed to unmarshal task configuration: %w", err) + } + + glog.V(1).Infof("Loaded task policy for %s from %s", taskType, configPath) + return &policy, nil +} + // GetConfigInfo returns information about the configuration storage func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { info := map[string]interface{}{ @@ -664,70 +402,6 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { return info } -// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy -func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { - policy := &worker_pb.MaintenancePolicy{ - GlobalMaxConcurrent: 4, - DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds - DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds - TaskPolicies: make(map[string]*worker_pb.TaskPolicy), - } - - // Load vacuum task configuration - if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil { - policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{ - Enabled: vacuumConfig.Enabled, - MaxConcurrent: int32(vacuumConfig.MaxConcurrent), - RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), - CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ - VacuumConfig: &worker_pb.VacuumTaskConfig{ - GarbageThreshold: float64(vacuumConfig.GarbageThreshold), - MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours - MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds), - }, - }, - } - } - - // Load erasure coding task configuration - if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil { - policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{ - Enabled: ecConfig.Enabled, - MaxConcurrent: int32(ecConfig.MaxConcurrent), - RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), - CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ - ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ - FullnessRatio: float64(ecConfig.FullnessRatio), - QuietForSeconds: int32(ecConfig.QuietForSeconds), - MinVolumeSizeMb: int32(ecConfig.MinSizeMB), - CollectionFilter: ecConfig.CollectionFilter, - }, - }, - } - } - - // Load balance task configuration - if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil { - policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{ - Enabled: balanceConfig.Enabled, - MaxConcurrent: int32(balanceConfig.MaxConcurrent), - RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), - CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), - TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ - BalanceConfig: &worker_pb.BalanceTaskConfig{ - ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold), - MinServerCount: int32(balanceConfig.MinServerCount), - }, - }, - } - } - - glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) - return policy -} - // SaveTaskDetail saves detailed task information to disk func (cp *ConfigPersistence) SaveTaskDetail(taskID string, detail *maintenance.TaskDetailData) error { if cp.dataDir == "" { |
