diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-30 12:38:03 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-30 12:38:03 -0700 |
| commit | 891a2fb6ebc324329f5330a140b8cacff3899db4 (patch) | |
| tree | d02aaa80a909e958aea831f206b3240b0237d7b7 /weed/admin/dash/config_persistence.go | |
| parent | 64198dad8346fe284cbef944fe01ff0d062c147d (diff) | |
| download | seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.tar.xz seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.zip | |
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design
* added simulation as tests
* reorganized the codebase to move the simulation framework and tests into their own dedicated package
* integration test. ec worker task
* remove "enhanced" reference
* start master, volume servers, filer
Current Status
✅ Master: Healthy and running (port 9333)
✅ Filer: Healthy and running (port 8888)
✅ Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready
* generate write load
* tasks are assigned
* admin start wtih grpc port. worker has its own working directory
* Update .gitignore
* working worker and admin. Task detection is not working yet.
* compiles, detection uses volumeSizeLimitMB from master
* compiles
* worker retries connecting to admin
* build and restart
* rendering pending tasks
* skip task ID column
* sticky worker id
* test canScheduleTaskNow
* worker reconnect to admin
* clean up logs
* worker register itself first
* worker can run ec work and report status
but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.
* move ec task logic
* listing ec shards
* local copy, ec. Need to distribute.
* ec is mostly working now
* distribution of ec shards needs improvement
* need configuration to enable ec
* show ec volumes
* interval field UI component
* rename
* integration test with vauuming
* garbage percentage threshold
* fix warning
* display ec shard sizes
* fix ec volumes list
* Update ui.go
* show default values
* ensure correct default value
* MaintenanceConfig use ConfigField
* use schema defined defaults
* config
* reduce duplication
* refactor to use BaseUIProvider
* each task register its schema
* checkECEncodingCandidate use ecDetector
* use vacuumDetector
* use volumeSizeLimitMB
* remove
remove
* remove unused
* refactor
* use new framework
* remove v2 reference
* refactor
* left menu can scroll now
* The maintenance manager was not being initialized when no data directory was configured for persistent storage.
* saving config
* Update task_config_schema_templ.go
* enable/disable tasks
* protobuf encoded task configurations
* fix system settings
* use ui component
* remove logs
* interface{} Reduction
* reduce interface{}
* reduce interface{}
* avoid from/to map
* reduce interface{}
* refactor
* keep it DRY
* added logging
* debug messages
* debug level
* debug
* show the log caller line
* use configured task policy
* log level
* handle admin heartbeat response
* Update worker.go
* fix EC rack and dc count
* Report task status to admin server
* fix task logging, simplify interface checking, use erasure_coding constants
* factor in empty volume server during task planning
* volume.list adds disk id
* track disk id also
* fix locking scheduled and manual scanning
* add active topology
* simplify task detector
* ec task completed, but shards are not showing up
* implement ec in ec_typed.go
* adjust log level
* dedup
* implementing ec copying shards and only ecx files
* use disk id when distributing ec shards
🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned
* Delete original volume from all locations
* clean up existing shard locations
* local encoding and distributing
* Update docker/admin_integration/EC-TESTING-README.md
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* check volume id range
* simplify
* fix tests
* fix types
* clean up logs and tests
---------
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/admin/dash/config_persistence.go')
| -rw-r--r-- | weed/admin/dash/config_persistence.go | 632 |
1 files changed, 526 insertions, 106 deletions
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index a2f74f4e7..b6b3074ab 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -1,23 +1,50 @@ package dash import ( - "encoding/json" "fmt" "os" "path/filepath" "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) const ( - // Configuration file names - MaintenanceConfigFile = "maintenance.json" - AdminConfigFile = "admin.json" + // Configuration subdirectory + ConfigSubdir = "conf" + + // Configuration file names (protobuf binary) + MaintenanceConfigFile = "maintenance.pb" + VacuumTaskConfigFile = "task_vacuum.pb" + ECTaskConfigFile = "task_erasure_coding.pb" + BalanceTaskConfigFile = "task_balance.pb" + ReplicationTaskConfigFile = "task_replication.pb" + + // JSON reference files + MaintenanceConfigJSONFile = "maintenance.json" + VacuumTaskConfigJSONFile = "task_vacuum.json" + ECTaskConfigJSONFile = "task_erasure_coding.json" + BalanceTaskConfigJSONFile = "task_balance.json" + ReplicationTaskConfigJSONFile = "task_replication.json" + ConfigDirPermissions = 0755 ConfigFilePermissions = 0644 ) +// Task configuration types +type ( + VacuumTaskConfig = worker_pb.VacuumTaskConfig + ErasureCodingTaskConfig = worker_pb.ErasureCodingTaskConfig + BalanceTaskConfig = worker_pb.BalanceTaskConfig + ReplicationTaskConfig = worker_pb.ReplicationTaskConfig +) + // ConfigPersistence handles saving and loading configuration files type ConfigPersistence struct { dataDir string @@ -30,122 +57,67 @@ func NewConfigPersistence(dataDir string) *ConfigPersistence { } } -// SaveMaintenanceConfig saves maintenance configuration to JSON file +// SaveMaintenanceConfig saves maintenance configuration to protobuf file and JSON reference func (cp *ConfigPersistence) SaveMaintenanceConfig(config *MaintenanceConfig) error { if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot save configuration") } - configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile) - - // Create directory if it doesn't exist - if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil { + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil { return fmt.Errorf("failed to create config directory: %w", err) } - // Marshal configuration to JSON - configData, err := json.MarshalIndent(config, "", " ") - if err != nil { - return fmt.Errorf("failed to marshal maintenance config: %w", err) - } - - // Write to file - if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { - return fmt.Errorf("failed to write maintenance config file: %w", err) - } - - glog.V(1).Infof("Saved maintenance configuration to %s", configPath) - return nil -} - -// LoadMaintenanceConfig loads maintenance configuration from JSON file -func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) { - if cp.dataDir == "" { - glog.V(1).Infof("No data directory specified, using default maintenance configuration") - return DefaultMaintenanceConfig(), nil - } - - configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - glog.V(1).Infof("Maintenance config file does not exist, using defaults: %s", configPath) - return DefaultMaintenanceConfig(), nil - } - - // Read file - configData, err := os.ReadFile(configPath) + // Save as protobuf (primary format) + pbConfigPath := filepath.Join(confDir, MaintenanceConfigFile) + pbData, err := proto.Marshal(config) if err != nil { - return nil, fmt.Errorf("failed to read maintenance config file: %w", err) + return fmt.Errorf("failed to marshal maintenance config to protobuf: %w", err) } - // Unmarshal JSON - var config MaintenanceConfig - if err := json.Unmarshal(configData, &config); err != nil { - return nil, fmt.Errorf("failed to unmarshal maintenance config: %w", err) + if err := os.WriteFile(pbConfigPath, pbData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write protobuf config file: %w", err) } - glog.V(1).Infof("Loaded maintenance configuration from %s", configPath) - return &config, nil -} - -// SaveAdminConfig saves general admin configuration to JSON file -func (cp *ConfigPersistence) SaveAdminConfig(config map[string]interface{}) error { - if cp.dataDir == "" { - return fmt.Errorf("no data directory specified, cannot save configuration") - } - - configPath := filepath.Join(cp.dataDir, AdminConfigFile) - - // Create directory if it doesn't exist - if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil { - return fmt.Errorf("failed to create config directory: %w", err) - } - - // Marshal configuration to JSON - configData, err := json.MarshalIndent(config, "", " ") + // Save JSON reference copy for debugging + jsonConfigPath := filepath.Join(confDir, MaintenanceConfigJSONFile) + jsonData, err := protojson.MarshalOptions{ + Multiline: true, + Indent: " ", + EmitUnpopulated: true, + }.Marshal(config) if err != nil { - return fmt.Errorf("failed to marshal admin config: %w", err) + return fmt.Errorf("failed to marshal maintenance config to JSON: %w", err) } - // Write to file - if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { - return fmt.Errorf("failed to write admin config file: %w", err) + if err := os.WriteFile(jsonConfigPath, jsonData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write JSON reference file: %w", err) } - glog.V(1).Infof("Saved admin configuration to %s", configPath) return nil } -// LoadAdminConfig loads general admin configuration from JSON file -func (cp *ConfigPersistence) LoadAdminConfig() (map[string]interface{}, error) { +// LoadMaintenanceConfig loads maintenance configuration from protobuf file +func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) { if cp.dataDir == "" { - glog.V(1).Infof("No data directory specified, using default admin configuration") - return make(map[string]interface{}), nil - } - - configPath := filepath.Join(cp.dataDir, AdminConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - glog.V(1).Infof("Admin config file does not exist, using defaults: %s", configPath) - return make(map[string]interface{}), nil + return DefaultMaintenanceConfig(), nil } - // Read file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read admin config file: %w", err) - } + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, MaintenanceConfigFile) - // Unmarshal JSON - var config map[string]interface{} - if err := json.Unmarshal(configData, &config); err != nil { - return nil, fmt.Errorf("failed to unmarshal admin config: %w", err) + // Try to load from protobuf file + if configData, err := os.ReadFile(configPath); err == nil { + var config MaintenanceConfig + if err := proto.Unmarshal(configData, &config); err == nil { + // Always populate policy from separate task configuration files + config.Policy = buildPolicyFromTaskConfigs() + return &config, nil + } } - glog.V(1).Infof("Loaded admin configuration from %s", configPath) - return config, nil + // File doesn't exist or failed to load, use defaults + return DefaultMaintenanceConfig(), nil } // GetConfigPath returns the path to a configuration file @@ -153,24 +125,35 @@ func (cp *ConfigPersistence) GetConfigPath(filename string) string { if cp.dataDir == "" { return "" } - return filepath.Join(cp.dataDir, filename) + + // All configs go in conf subdirectory + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + return filepath.Join(confDir, filename) } -// ListConfigFiles returns all configuration files in the data directory +// ListConfigFiles returns all configuration files in the conf subdirectory func (cp *ConfigPersistence) ListConfigFiles() ([]string, error) { if cp.dataDir == "" { return nil, fmt.Errorf("no data directory specified") } - files, err := os.ReadDir(cp.dataDir) + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + files, err := os.ReadDir(confDir) if err != nil { + // If conf directory doesn't exist, return empty list + if os.IsNotExist(err) { + return []string{}, nil + } return nil, fmt.Errorf("failed to read config directory: %w", err) } var configFiles []string for _, file := range files { - if !file.IsDir() && filepath.Ext(file.Name()) == ".json" { - configFiles = append(configFiles, file.Name()) + if !file.IsDir() { + ext := filepath.Ext(file.Name()) + if ext == ".json" || ext == ".pb" { + configFiles = append(configFiles, file.Name()) + } } } @@ -183,7 +166,7 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error { return fmt.Errorf("no data directory specified") } - configPath := filepath.Join(cp.dataDir, filename) + configPath := cp.GetConfigPath(filename) if _, err := os.Stat(configPath); os.IsNotExist(err) { return fmt.Errorf("config file does not exist: %s", filename) } @@ -191,7 +174,10 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error { // Create backup filename with timestamp timestamp := time.Now().Format("2006-01-02_15-04-05") backupName := fmt.Sprintf("%s.backup_%s", filename, timestamp) - backupPath := filepath.Join(cp.dataDir, backupName) + + // Determine backup directory (conf subdirectory) + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + backupPath := filepath.Join(confDir, backupName) // Copy file configData, err := os.ReadFile(configPath) @@ -213,7 +199,10 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { return fmt.Errorf("no data directory specified") } - backupPath := filepath.Join(cp.dataDir, backupName) + // Determine backup path (conf subdirectory) + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + backupPath := filepath.Join(confDir, backupName) + if _, err := os.Stat(backupPath); os.IsNotExist(err) { return fmt.Errorf("backup file does not exist: %s", backupName) } @@ -225,7 +214,7 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { } // Write to config file - configPath := filepath.Join(cp.dataDir, filename) + configPath := cp.GetConfigPath(filename) if err := os.WriteFile(configPath, backupData, ConfigFilePermissions); err != nil { return fmt.Errorf("failed to restore config: %w", err) } @@ -234,6 +223,364 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { return nil } +// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file +func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error { + return cp.saveTaskConfig(VacuumTaskConfigFile, config) +} + +// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file +func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error { + return cp.saveTaskConfig(VacuumTaskConfigFile, policy) +} + +// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file +func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) { + // Load as TaskPolicy and extract vacuum config + if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil { + if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil { + return vacuumConfig, nil + } + } + + // Return default config if no valid config found + return &VacuumTaskConfig{ + GarbageThreshold: 0.3, + MinVolumeAgeHours: 24, + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + }, nil +} + +// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file +func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) { + if cp.dataDir == "" { + // Return default policy if no data directory + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 2, + RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds + CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: 0.3, + MinVolumeAgeHours: 24, + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + }, + }, + }, nil + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, VacuumTaskConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + // Return default policy if file doesn't exist + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 2, + RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds + CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: 0.3, + MinVolumeAgeHours: 24, + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + }, + }, + }, nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read vacuum task config file: %w", err) + } + + // Try to unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err == nil { + // Validate that it's actually a TaskPolicy with vacuum config + if policy.GetVacuumConfig() != nil { + glog.V(1).Infof("Loaded vacuum task policy from %s", configPath) + return &policy, nil + } + } + + return nil, fmt.Errorf("failed to unmarshal vacuum task configuration") +} + +// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file +func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error { + return cp.saveTaskConfig(ECTaskConfigFile, config) +} + +// SaveErasureCodingTaskPolicy saves complete EC task policy to protobuf file +func (cp *ConfigPersistence) SaveErasureCodingTaskPolicy(policy *worker_pb.TaskPolicy) error { + return cp.saveTaskConfig(ECTaskConfigFile, policy) +} + +// LoadErasureCodingTaskConfig loads EC task configuration from protobuf file +func (cp *ConfigPersistence) LoadErasureCodingTaskConfig() (*ErasureCodingTaskConfig, error) { + // Load as TaskPolicy and extract EC config + if taskPolicy, err := cp.LoadErasureCodingTaskPolicy(); err == nil && taskPolicy != nil { + if ecConfig := taskPolicy.GetErasureCodingConfig(); ecConfig != nil { + return ecConfig, nil + } + } + + // Return default config if no valid config found + return &ErasureCodingTaskConfig{ + FullnessRatio: 0.9, + QuietForSeconds: 3600, + MinVolumeSizeMb: 1024, + CollectionFilter: "", + }, nil +} + +// LoadErasureCodingTaskPolicy loads complete EC task policy from protobuf file +func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) { + if cp.dataDir == "" { + // Return default policy if no data directory + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds + CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: 0.9, + QuietForSeconds: 3600, + MinVolumeSizeMb: 1024, + CollectionFilter: "", + }, + }, + }, nil + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, ECTaskConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + // Return default policy if file doesn't exist + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds + CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: 0.9, + QuietForSeconds: 3600, + MinVolumeSizeMb: 1024, + CollectionFilter: "", + }, + }, + }, nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read EC task config file: %w", err) + } + + // Try to unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err == nil { + // Validate that it's actually a TaskPolicy with EC config + if policy.GetErasureCodingConfig() != nil { + glog.V(1).Infof("Loaded EC task policy from %s", configPath) + return &policy, nil + } + } + + return nil, fmt.Errorf("failed to unmarshal EC task configuration") +} + +// SaveBalanceTaskConfig saves balance task configuration to protobuf file +func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error { + return cp.saveTaskConfig(BalanceTaskConfigFile, config) +} + +// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file +func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error { + return cp.saveTaskConfig(BalanceTaskConfigFile, policy) +} + +// LoadBalanceTaskConfig loads balance task configuration from protobuf file +func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) { + // Load as TaskPolicy and extract balance config + if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil { + if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil { + return balanceConfig, nil + } + } + + // Return default config if no valid config found + return &BalanceTaskConfig{ + ImbalanceThreshold: 0.1, + MinServerCount: 2, + }, nil +} + +// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file +func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) { + if cp.dataDir == "" { + // Return default policy if no data directory + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds + CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: 0.1, + MinServerCount: 2, + }, + }, + }, nil + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, BalanceTaskConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + // Return default policy if file doesn't exist + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds + CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: 0.1, + MinServerCount: 2, + }, + }, + }, nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read balance task config file: %w", err) + } + + // Try to unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err == nil { + // Validate that it's actually a TaskPolicy with balance config + if policy.GetBalanceConfig() != nil { + glog.V(1).Infof("Loaded balance task policy from %s", configPath) + return &policy, nil + } + } + + return nil, fmt.Errorf("failed to unmarshal balance task configuration") +} + +// SaveReplicationTaskConfig saves replication task configuration to protobuf file +func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error { + return cp.saveTaskConfig(ReplicationTaskConfigFile, config) +} + +// LoadReplicationTaskConfig loads replication task configuration from protobuf file +func (cp *ConfigPersistence) LoadReplicationTaskConfig() (*ReplicationTaskConfig, error) { + var config ReplicationTaskConfig + err := cp.loadTaskConfig(ReplicationTaskConfigFile, &config) + if err != nil { + // Return default config if file doesn't exist + if os.IsNotExist(err) { + return &ReplicationTaskConfig{ + TargetReplicaCount: 1, + }, nil + } + return nil, err + } + return &config, nil +} + +// saveTaskConfig is a generic helper for saving task configurations with both protobuf and JSON reference +func (cp *ConfigPersistence) saveTaskConfig(filename string, config proto.Message) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task configuration") + } + + // Create conf subdirectory path + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, filename) + + // Generate JSON reference filename + jsonFilename := filename[:len(filename)-3] + ".json" // Replace .pb with .json + jsonPath := filepath.Join(confDir, jsonFilename) + + // Create conf directory if it doesn't exist + if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create config directory: %w", err) + } + + // Marshal configuration to protobuf binary format + configData, err := proto.Marshal(config) + if err != nil { + return fmt.Errorf("failed to marshal task config: %w", err) + } + + // Write protobuf file + if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task config file: %w", err) + } + + // Marshal configuration to JSON for reference + marshaler := protojson.MarshalOptions{ + Multiline: true, + Indent: " ", + EmitUnpopulated: true, + } + jsonData, err := marshaler.Marshal(config) + if err != nil { + glog.Warningf("Failed to marshal task config to JSON reference: %v", err) + } else { + // Write JSON reference file + if err := os.WriteFile(jsonPath, jsonData, ConfigFilePermissions); err != nil { + glog.Warningf("Failed to write task config JSON reference: %v", err) + } + } + + glog.V(1).Infof("Saved task configuration to %s (with JSON reference)", configPath) + return nil +} + +// loadTaskConfig is a generic helper for loading task configurations from conf subdirectory +func (cp *ConfigPersistence) loadTaskConfig(filename string, config proto.Message) error { + if cp.dataDir == "" { + return os.ErrNotExist // Will trigger default config return + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, filename) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + return err // Will trigger default config return + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("failed to read task config file: %w", err) + } + + // Unmarshal protobuf binary data + if err := proto.Unmarshal(configData, config); err != nil { + return fmt.Errorf("failed to unmarshal task config: %w", err) + } + + glog.V(1).Infof("Loaded task configuration from %s", configPath) + return nil +} + // GetDataDir returns the data directory path func (cp *ConfigPersistence) GetDataDir() string { return cp.dataDir @@ -249,6 +596,7 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { info := map[string]interface{}{ "data_dir_configured": cp.IsConfigured(), "data_dir": cp.dataDir, + "config_subdir": ConfigSubdir, } if cp.IsConfigured() { @@ -256,10 +604,18 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { if _, err := os.Stat(cp.dataDir); err == nil { info["data_dir_exists"] = true - // List config files - configFiles, err := cp.ListConfigFiles() - if err == nil { - info["config_files"] = configFiles + // Check if conf subdirectory exists + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + if _, err := os.Stat(confDir); err == nil { + info["conf_dir_exists"] = true + + // List config files + configFiles, err := cp.ListConfigFiles() + if err == nil { + info["config_files"] = configFiles + } + } else { + info["conf_dir_exists"] = false } } else { info["data_dir_exists"] = false @@ -268,3 +624,67 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { return info } + +// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy +func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { + policy := &worker_pb.MaintenancePolicy{ + GlobalMaxConcurrent: 4, + DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds + DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds + TaskPolicies: make(map[string]*worker_pb.TaskPolicy), + } + + // Load vacuum task configuration + if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil { + policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{ + Enabled: vacuumConfig.Enabled, + MaxConcurrent: int32(vacuumConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: float64(vacuumConfig.GarbageThreshold), + MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours + MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds), + }, + }, + } + } + + // Load erasure coding task configuration + if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil { + policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{ + Enabled: ecConfig.Enabled, + MaxConcurrent: int32(ecConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: float64(ecConfig.FullnessRatio), + QuietForSeconds: int32(ecConfig.QuietForSeconds), + MinVolumeSizeMb: int32(ecConfig.MinSizeMB), + CollectionFilter: ecConfig.CollectionFilter, + }, + }, + } + } + + // Load balance task configuration + if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil { + policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{ + Enabled: balanceConfig.Enabled, + MaxConcurrent: int32(balanceConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold), + MinServerCount: int32(balanceConfig.MinServerCount), + }, + }, + } + } + + glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) + return policy +} |
