aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/config_persistence.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/config_persistence.go')
-rw-r--r--weed/admin/dash/config_persistence.go406
1 files changed, 40 insertions, 366 deletions
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go
index 1fe1a9b42..75a6a86f2 100644
--- a/weed/admin/dash/config_persistence.go
+++ b/weed/admin/dash/config_persistence.go
@@ -12,9 +12,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
@@ -24,18 +21,10 @@ const (
ConfigSubdir = "conf"
// Configuration file names (protobuf binary)
- MaintenanceConfigFile = "maintenance.pb"
- VacuumTaskConfigFile = "task_vacuum.pb"
- ECTaskConfigFile = "task_erasure_coding.pb"
- BalanceTaskConfigFile = "task_balance.pb"
- ReplicationTaskConfigFile = "task_replication.pb"
+ MaintenanceConfigFile = "maintenance.pb"
// JSON reference files
- MaintenanceConfigJSONFile = "maintenance.json"
- VacuumTaskConfigJSONFile = "task_vacuum.json"
- ECTaskConfigJSONFile = "task_erasure_coding.json"
- BalanceTaskConfigJSONFile = "task_balance.json"
- ReplicationTaskConfigJSONFile = "task_replication.json"
+ MaintenanceConfigJSONFile = "maintenance.json"
// Task persistence subdirectories and settings
TasksSubdir = "tasks"
@@ -47,14 +36,6 @@ const (
ConfigFilePermissions = 0644
)
-// Task configuration types
-type (
- VacuumTaskConfig = worker_pb.VacuumTaskConfig
- ErasureCodingTaskConfig = worker_pb.ErasureCodingTaskConfig
- BalanceTaskConfig = worker_pb.BalanceTaskConfig
- ReplicationTaskConfig = worker_pb.ReplicationTaskConfig
-)
-
// isValidTaskID validates that a task ID is safe for use in file paths
// This prevents path traversal attacks by ensuring the task ID doesn't contain
// path separators or parent directory references
@@ -149,8 +130,6 @@ func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error)
if configData, err := os.ReadFile(configPath); err == nil {
var config MaintenanceConfig
if err := proto.Unmarshal(configData, &config); err == nil {
- // Always populate policy from separate task configuration files
- config.Policy = buildPolicyFromTaskConfigs()
return &config, nil
}
}
@@ -262,285 +241,6 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
return nil
}
-// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file
-func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error {
- return cp.saveTaskConfig(VacuumTaskConfigFile, config)
-}
-
-// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file
-func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error {
- return cp.saveTaskConfig(VacuumTaskConfigFile, policy)
-}
-
-// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file
-func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) {
- // Load as TaskPolicy and extract vacuum config
- if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil {
- if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil {
- return vacuumConfig, nil
- }
- }
-
- // Return default config if no valid config found
- return &VacuumTaskConfig{
- GarbageThreshold: 0.3,
- MinVolumeAgeHours: 24,
- MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
- }, nil
-}
-
-// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file
-func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) {
- if cp.dataDir == "" {
- // Return default policy if no data directory
- return &worker_pb.TaskPolicy{
- Enabled: true,
- MaxConcurrent: 2,
- RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
- CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
- TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
- VacuumConfig: &worker_pb.VacuumTaskConfig{
- GarbageThreshold: 0.3,
- MinVolumeAgeHours: 24,
- MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
- },
- },
- }, nil
- }
-
- confDir := filepath.Join(cp.dataDir, ConfigSubdir)
- configPath := filepath.Join(confDir, VacuumTaskConfigFile)
-
- // Check if file exists
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- // Return default policy if file doesn't exist
- return &worker_pb.TaskPolicy{
- Enabled: true,
- MaxConcurrent: 2,
- RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
- CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
- TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
- VacuumConfig: &worker_pb.VacuumTaskConfig{
- GarbageThreshold: 0.3,
- MinVolumeAgeHours: 24,
- MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
- },
- },
- }, nil
- }
-
- // Read file
- configData, err := os.ReadFile(configPath)
- if err != nil {
- return nil, fmt.Errorf("failed to read vacuum task config file: %w", err)
- }
-
- // Try to unmarshal as TaskPolicy
- var policy worker_pb.TaskPolicy
- if err := proto.Unmarshal(configData, &policy); err == nil {
- // Validate that it's actually a TaskPolicy with vacuum config
- if policy.GetVacuumConfig() != nil {
- glog.V(1).Infof("Loaded vacuum task policy from %s", configPath)
- return &policy, nil
- }
- }
-
- return nil, fmt.Errorf("failed to unmarshal vacuum task configuration")
-}
-
-// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file
-func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error {
- return cp.saveTaskConfig(ECTaskConfigFile, config)
-}
-
-// SaveErasureCodingTaskPolicy saves complete EC task policy to protobuf file
-func (cp *ConfigPersistence) SaveErasureCodingTaskPolicy(policy *worker_pb.TaskPolicy) error {
- return cp.saveTaskConfig(ECTaskConfigFile, policy)
-}
-
-// LoadErasureCodingTaskConfig loads EC task configuration from protobuf file
-func (cp *ConfigPersistence) LoadErasureCodingTaskConfig() (*ErasureCodingTaskConfig, error) {
- // Load as TaskPolicy and extract EC config
- if taskPolicy, err := cp.LoadErasureCodingTaskPolicy(); err == nil && taskPolicy != nil {
- if ecConfig := taskPolicy.GetErasureCodingConfig(); ecConfig != nil {
- return ecConfig, nil
- }
- }
-
- // Return default config if no valid config found
- return &ErasureCodingTaskConfig{
- FullnessRatio: 0.9,
- QuietForSeconds: 3600,
- MinVolumeSizeMb: 1024,
- CollectionFilter: "",
- }, nil
-}
-
-// LoadErasureCodingTaskPolicy loads complete EC task policy from protobuf file
-func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) {
- if cp.dataDir == "" {
- // Return default policy if no data directory
- return &worker_pb.TaskPolicy{
- Enabled: true,
- MaxConcurrent: 1,
- RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
- CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
- TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
- ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
- FullnessRatio: 0.9,
- QuietForSeconds: 3600,
- MinVolumeSizeMb: 1024,
- CollectionFilter: "",
- },
- },
- }, nil
- }
-
- confDir := filepath.Join(cp.dataDir, ConfigSubdir)
- configPath := filepath.Join(confDir, ECTaskConfigFile)
-
- // Check if file exists
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- // Return default policy if file doesn't exist
- return &worker_pb.TaskPolicy{
- Enabled: true,
- MaxConcurrent: 1,
- RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
- CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
- TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
- ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
- FullnessRatio: 0.9,
- QuietForSeconds: 3600,
- MinVolumeSizeMb: 1024,
- CollectionFilter: "",
- },
- },
- }, nil
- }
-
- // Read file
- configData, err := os.ReadFile(configPath)
- if err != nil {
- return nil, fmt.Errorf("failed to read EC task config file: %w", err)
- }
-
- // Try to unmarshal as TaskPolicy
- var policy worker_pb.TaskPolicy
- if err := proto.Unmarshal(configData, &policy); err == nil {
- // Validate that it's actually a TaskPolicy with EC config
- if policy.GetErasureCodingConfig() != nil {
- glog.V(1).Infof("Loaded EC task policy from %s", configPath)
- return &policy, nil
- }
- }
-
- return nil, fmt.Errorf("failed to unmarshal EC task configuration")
-}
-
-// SaveBalanceTaskConfig saves balance task configuration to protobuf file
-func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error {
- return cp.saveTaskConfig(BalanceTaskConfigFile, config)
-}
-
-// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file
-func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error {
- return cp.saveTaskConfig(BalanceTaskConfigFile, policy)
-}
-
-// LoadBalanceTaskConfig loads balance task configuration from protobuf file
-func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) {
- // Load as TaskPolicy and extract balance config
- if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil {
- if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil {
- return balanceConfig, nil
- }
- }
-
- // Return default config if no valid config found
- return &BalanceTaskConfig{
- ImbalanceThreshold: 0.1,
- MinServerCount: 2,
- }, nil
-}
-
-// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file
-func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) {
- if cp.dataDir == "" {
- // Return default policy if no data directory
- return &worker_pb.TaskPolicy{
- Enabled: true,
- MaxConcurrent: 1,
- RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
- CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
- TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
- BalanceConfig: &worker_pb.BalanceTaskConfig{
- ImbalanceThreshold: 0.1,
- MinServerCount: 2,
- },
- },
- }, nil
- }
-
- confDir := filepath.Join(cp.dataDir, ConfigSubdir)
- configPath := filepath.Join(confDir, BalanceTaskConfigFile)
-
- // Check if file exists
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- // Return default policy if file doesn't exist
- return &worker_pb.TaskPolicy{
- Enabled: true,
- MaxConcurrent: 1,
- RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
- CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
- TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
- BalanceConfig: &worker_pb.BalanceTaskConfig{
- ImbalanceThreshold: 0.1,
- MinServerCount: 2,
- },
- },
- }, nil
- }
-
- // Read file
- configData, err := os.ReadFile(configPath)
- if err != nil {
- return nil, fmt.Errorf("failed to read balance task config file: %w", err)
- }
-
- // Try to unmarshal as TaskPolicy
- var policy worker_pb.TaskPolicy
- if err := proto.Unmarshal(configData, &policy); err == nil {
- // Validate that it's actually a TaskPolicy with balance config
- if policy.GetBalanceConfig() != nil {
- glog.V(1).Infof("Loaded balance task policy from %s", configPath)
- return &policy, nil
- }
- }
-
- return nil, fmt.Errorf("failed to unmarshal balance task configuration")
-}
-
-// SaveReplicationTaskConfig saves replication task configuration to protobuf file
-func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error {
- return cp.saveTaskConfig(ReplicationTaskConfigFile, config)
-}
-
-// LoadReplicationTaskConfig loads replication task configuration from protobuf file
-func (cp *ConfigPersistence) LoadReplicationTaskConfig() (*ReplicationTaskConfig, error) {
- var config ReplicationTaskConfig
- err := cp.loadTaskConfig(ReplicationTaskConfigFile, &config)
- if err != nil {
- // Return default config if file doesn't exist
- if os.IsNotExist(err) {
- return &ReplicationTaskConfig{
- TargetReplicaCount: 1,
- }, nil
- }
- return nil, err
- }
- return &config, nil
-}
-
// saveTaskConfig is a generic helper for saving task configurations with both protobuf and JSON reference
func (cp *ConfigPersistence) saveTaskConfig(filename string, config proto.Message) error {
if cp.dataDir == "" {
@@ -630,6 +330,44 @@ func (cp *ConfigPersistence) IsConfigured() bool {
return cp.dataDir != ""
}
+// SaveTaskPolicyGeneric saves a task policy for any task type dynamically
+func (cp *ConfigPersistence) SaveTaskPolicyGeneric(taskType string, policy *worker_pb.TaskPolicy) error {
+ filename := fmt.Sprintf("task_%s.pb", taskType)
+ return cp.saveTaskConfig(filename, policy)
+}
+
+// LoadTaskPolicyGeneric loads a task policy for any task type dynamically
+func (cp *ConfigPersistence) LoadTaskPolicyGeneric(taskType string) (*worker_pb.TaskPolicy, error) {
+ filename := fmt.Sprintf("task_%s.pb", taskType)
+
+ if cp.dataDir == "" {
+ return nil, fmt.Errorf("no data directory configured")
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, filename)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ return nil, fmt.Errorf("no configuration found for task type: %s", taskType)
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read task config file: %w", err)
+ }
+
+ // Unmarshal as TaskPolicy
+ var policy worker_pb.TaskPolicy
+ if err := proto.Unmarshal(configData, &policy); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal task configuration: %w", err)
+ }
+
+ glog.V(1).Infof("Loaded task policy for %s from %s", taskType, configPath)
+ return &policy, nil
+}
+
// GetConfigInfo returns information about the configuration storage
func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
info := map[string]interface{}{
@@ -664,70 +402,6 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
return info
}
-// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
-func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
- policy := &worker_pb.MaintenancePolicy{
- GlobalMaxConcurrent: 4,
- DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
- DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
- TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
- }
-
- // Load vacuum task configuration
- if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
- policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
- Enabled: vacuumConfig.Enabled,
- MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
- RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
- CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
- TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
- VacuumConfig: &worker_pb.VacuumTaskConfig{
- GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
- MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
- MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
- },
- },
- }
- }
-
- // Load erasure coding task configuration
- if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
- policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
- Enabled: ecConfig.Enabled,
- MaxConcurrent: int32(ecConfig.MaxConcurrent),
- RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
- CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
- TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
- ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
- FullnessRatio: float64(ecConfig.FullnessRatio),
- QuietForSeconds: int32(ecConfig.QuietForSeconds),
- MinVolumeSizeMb: int32(ecConfig.MinSizeMB),
- CollectionFilter: ecConfig.CollectionFilter,
- },
- },
- }
- }
-
- // Load balance task configuration
- if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
- policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
- Enabled: balanceConfig.Enabled,
- MaxConcurrent: int32(balanceConfig.MaxConcurrent),
- RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
- CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
- TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
- BalanceConfig: &worker_pb.BalanceTaskConfig{
- ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
- MinServerCount: int32(balanceConfig.MinServerCount),
- },
- },
- }
- }
-
- glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
- return policy
-}
-
// SaveTaskDetail saves detailed task information to disk
func (cp *ConfigPersistence) SaveTaskDetail(taskID string, detail *maintenance.TaskDetailData) error {
if cp.dataDir == "" {