aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/config_persistence.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/config_persistence.go')
-rw-r--r--weed/admin/dash/config_persistence.go632
1 files changed, 526 insertions, 106 deletions
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go
index a2f74f4e7..b6b3074ab 100644
--- a/weed/admin/dash/config_persistence.go
+++ b/weed/admin/dash/config_persistence.go
@@ -1,23 +1,50 @@
package dash
import (
- "encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/proto"
)
const (
- // Configuration file names
- MaintenanceConfigFile = "maintenance.json"
- AdminConfigFile = "admin.json"
+ // Configuration subdirectory
+ ConfigSubdir = "conf"
+
+ // Configuration file names (protobuf binary)
+ MaintenanceConfigFile = "maintenance.pb"
+ VacuumTaskConfigFile = "task_vacuum.pb"
+ ECTaskConfigFile = "task_erasure_coding.pb"
+ BalanceTaskConfigFile = "task_balance.pb"
+ ReplicationTaskConfigFile = "task_replication.pb"
+
+ // JSON reference files
+ MaintenanceConfigJSONFile = "maintenance.json"
+ VacuumTaskConfigJSONFile = "task_vacuum.json"
+ ECTaskConfigJSONFile = "task_erasure_coding.json"
+ BalanceTaskConfigJSONFile = "task_balance.json"
+ ReplicationTaskConfigJSONFile = "task_replication.json"
+
ConfigDirPermissions = 0755
ConfigFilePermissions = 0644
)
+// Task configuration types
+type (
+ VacuumTaskConfig = worker_pb.VacuumTaskConfig
+ ErasureCodingTaskConfig = worker_pb.ErasureCodingTaskConfig
+ BalanceTaskConfig = worker_pb.BalanceTaskConfig
+ ReplicationTaskConfig = worker_pb.ReplicationTaskConfig
+)
+
// ConfigPersistence handles saving and loading configuration files
type ConfigPersistence struct {
dataDir string
@@ -30,122 +57,67 @@ func NewConfigPersistence(dataDir string) *ConfigPersistence {
}
}
-// SaveMaintenanceConfig saves maintenance configuration to JSON file
+// SaveMaintenanceConfig saves maintenance configuration to protobuf file and JSON reference
func (cp *ConfigPersistence) SaveMaintenanceConfig(config *MaintenanceConfig) error {
if cp.dataDir == "" {
return fmt.Errorf("no data directory specified, cannot save configuration")
}
- configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile)
-
- // Create directory if it doesn't exist
- if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil {
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
- // Marshal configuration to JSON
- configData, err := json.MarshalIndent(config, "", " ")
- if err != nil {
- return fmt.Errorf("failed to marshal maintenance config: %w", err)
- }
-
- // Write to file
- if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
- return fmt.Errorf("failed to write maintenance config file: %w", err)
- }
-
- glog.V(1).Infof("Saved maintenance configuration to %s", configPath)
- return nil
-}
-
-// LoadMaintenanceConfig loads maintenance configuration from JSON file
-func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) {
- if cp.dataDir == "" {
- glog.V(1).Infof("No data directory specified, using default maintenance configuration")
- return DefaultMaintenanceConfig(), nil
- }
-
- configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile)
-
- // Check if file exists
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- glog.V(1).Infof("Maintenance config file does not exist, using defaults: %s", configPath)
- return DefaultMaintenanceConfig(), nil
- }
-
- // Read file
- configData, err := os.ReadFile(configPath)
+ // Save as protobuf (primary format)
+ pbConfigPath := filepath.Join(confDir, MaintenanceConfigFile)
+ pbData, err := proto.Marshal(config)
if err != nil {
- return nil, fmt.Errorf("failed to read maintenance config file: %w", err)
+ return fmt.Errorf("failed to marshal maintenance config to protobuf: %w", err)
}
- // Unmarshal JSON
- var config MaintenanceConfig
- if err := json.Unmarshal(configData, &config); err != nil {
- return nil, fmt.Errorf("failed to unmarshal maintenance config: %w", err)
+ if err := os.WriteFile(pbConfigPath, pbData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write protobuf config file: %w", err)
}
- glog.V(1).Infof("Loaded maintenance configuration from %s", configPath)
- return &config, nil
-}
-
-// SaveAdminConfig saves general admin configuration to JSON file
-func (cp *ConfigPersistence) SaveAdminConfig(config map[string]interface{}) error {
- if cp.dataDir == "" {
- return fmt.Errorf("no data directory specified, cannot save configuration")
- }
-
- configPath := filepath.Join(cp.dataDir, AdminConfigFile)
-
- // Create directory if it doesn't exist
- if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil {
- return fmt.Errorf("failed to create config directory: %w", err)
- }
-
- // Marshal configuration to JSON
- configData, err := json.MarshalIndent(config, "", " ")
+ // Save JSON reference copy for debugging
+ jsonConfigPath := filepath.Join(confDir, MaintenanceConfigJSONFile)
+ jsonData, err := protojson.MarshalOptions{
+ Multiline: true,
+ Indent: " ",
+ EmitUnpopulated: true,
+ }.Marshal(config)
if err != nil {
- return fmt.Errorf("failed to marshal admin config: %w", err)
+ return fmt.Errorf("failed to marshal maintenance config to JSON: %w", err)
}
- // Write to file
- if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
- return fmt.Errorf("failed to write admin config file: %w", err)
+ if err := os.WriteFile(jsonConfigPath, jsonData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write JSON reference file: %w", err)
}
- glog.V(1).Infof("Saved admin configuration to %s", configPath)
return nil
}
-// LoadAdminConfig loads general admin configuration from JSON file
-func (cp *ConfigPersistence) LoadAdminConfig() (map[string]interface{}, error) {
+// LoadMaintenanceConfig loads maintenance configuration from protobuf file
+func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) {
if cp.dataDir == "" {
- glog.V(1).Infof("No data directory specified, using default admin configuration")
- return make(map[string]interface{}), nil
- }
-
- configPath := filepath.Join(cp.dataDir, AdminConfigFile)
-
- // Check if file exists
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- glog.V(1).Infof("Admin config file does not exist, using defaults: %s", configPath)
- return make(map[string]interface{}), nil
+ return DefaultMaintenanceConfig(), nil
}
- // Read file
- configData, err := os.ReadFile(configPath)
- if err != nil {
- return nil, fmt.Errorf("failed to read admin config file: %w", err)
- }
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, MaintenanceConfigFile)
- // Unmarshal JSON
- var config map[string]interface{}
- if err := json.Unmarshal(configData, &config); err != nil {
- return nil, fmt.Errorf("failed to unmarshal admin config: %w", err)
+ // Try to load from protobuf file
+ if configData, err := os.ReadFile(configPath); err == nil {
+ var config MaintenanceConfig
+ if err := proto.Unmarshal(configData, &config); err == nil {
+ // Always populate policy from separate task configuration files
+ config.Policy = buildPolicyFromTaskConfigs()
+ return &config, nil
+ }
}
- glog.V(1).Infof("Loaded admin configuration from %s", configPath)
- return config, nil
+ // File doesn't exist or failed to load, use defaults
+ return DefaultMaintenanceConfig(), nil
}
// GetConfigPath returns the path to a configuration file
@@ -153,24 +125,35 @@ func (cp *ConfigPersistence) GetConfigPath(filename string) string {
if cp.dataDir == "" {
return ""
}
- return filepath.Join(cp.dataDir, filename)
+
+ // All configs go in conf subdirectory
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ return filepath.Join(confDir, filename)
}
-// ListConfigFiles returns all configuration files in the data directory
+// ListConfigFiles returns all configuration files in the conf subdirectory
func (cp *ConfigPersistence) ListConfigFiles() ([]string, error) {
if cp.dataDir == "" {
return nil, fmt.Errorf("no data directory specified")
}
- files, err := os.ReadDir(cp.dataDir)
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ files, err := os.ReadDir(confDir)
if err != nil {
+ // If conf directory doesn't exist, return empty list
+ if os.IsNotExist(err) {
+ return []string{}, nil
+ }
return nil, fmt.Errorf("failed to read config directory: %w", err)
}
var configFiles []string
for _, file := range files {
- if !file.IsDir() && filepath.Ext(file.Name()) == ".json" {
- configFiles = append(configFiles, file.Name())
+ if !file.IsDir() {
+ ext := filepath.Ext(file.Name())
+ if ext == ".json" || ext == ".pb" {
+ configFiles = append(configFiles, file.Name())
+ }
}
}
@@ -183,7 +166,7 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error {
return fmt.Errorf("no data directory specified")
}
- configPath := filepath.Join(cp.dataDir, filename)
+ configPath := cp.GetConfigPath(filename)
if _, err := os.Stat(configPath); os.IsNotExist(err) {
return fmt.Errorf("config file does not exist: %s", filename)
}
@@ -191,7 +174,10 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error {
// Create backup filename with timestamp
timestamp := time.Now().Format("2006-01-02_15-04-05")
backupName := fmt.Sprintf("%s.backup_%s", filename, timestamp)
- backupPath := filepath.Join(cp.dataDir, backupName)
+
+ // Determine backup directory (conf subdirectory)
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ backupPath := filepath.Join(confDir, backupName)
// Copy file
configData, err := os.ReadFile(configPath)
@@ -213,7 +199,10 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
return fmt.Errorf("no data directory specified")
}
- backupPath := filepath.Join(cp.dataDir, backupName)
+ // Determine backup path (conf subdirectory)
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ backupPath := filepath.Join(confDir, backupName)
+
if _, err := os.Stat(backupPath); os.IsNotExist(err) {
return fmt.Errorf("backup file does not exist: %s", backupName)
}
@@ -225,7 +214,7 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
}
// Write to config file
- configPath := filepath.Join(cp.dataDir, filename)
+ configPath := cp.GetConfigPath(filename)
if err := os.WriteFile(configPath, backupData, ConfigFilePermissions); err != nil {
return fmt.Errorf("failed to restore config: %w", err)
}
@@ -234,6 +223,364 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
return nil
}
+// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file
+func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error {
+ return cp.saveTaskConfig(VacuumTaskConfigFile, config)
+}
+
+// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file
+func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return cp.saveTaskConfig(VacuumTaskConfigFile, policy)
+}
+
+// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file
+func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) {
+ // Load as TaskPolicy and extract vacuum config
+ if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil {
+ if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil {
+ return vacuumConfig, nil
+ }
+ }
+
+ // Return default config if no valid config found
+ return &VacuumTaskConfig{
+ GarbageThreshold: 0.3,
+ MinVolumeAgeHours: 24,
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }, nil
+}
+
+// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file
+func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) {
+ if cp.dataDir == "" {
+ // Return default policy if no data directory
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 2,
+ RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: 0.3,
+ MinVolumeAgeHours: 24,
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ },
+ },
+ }, nil
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, VacuumTaskConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ // Return default policy if file doesn't exist
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 2,
+ RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: 0.3,
+ MinVolumeAgeHours: 24,
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ },
+ },
+ }, nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read vacuum task config file: %w", err)
+ }
+
+ // Try to unmarshal as TaskPolicy
+ var policy worker_pb.TaskPolicy
+ if err := proto.Unmarshal(configData, &policy); err == nil {
+ // Validate that it's actually a TaskPolicy with vacuum config
+ if policy.GetVacuumConfig() != nil {
+ glog.V(1).Infof("Loaded vacuum task policy from %s", configPath)
+ return &policy, nil
+ }
+ }
+
+ return nil, fmt.Errorf("failed to unmarshal vacuum task configuration")
+}
+
+// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file
+func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error {
+ return cp.saveTaskConfig(ECTaskConfigFile, config)
+}
+
+// SaveErasureCodingTaskPolicy saves complete EC task policy to protobuf file
+func (cp *ConfigPersistence) SaveErasureCodingTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return cp.saveTaskConfig(ECTaskConfigFile, policy)
+}
+
+// LoadErasureCodingTaskConfig loads EC task configuration from protobuf file
+func (cp *ConfigPersistence) LoadErasureCodingTaskConfig() (*ErasureCodingTaskConfig, error) {
+ // Load as TaskPolicy and extract EC config
+ if taskPolicy, err := cp.LoadErasureCodingTaskPolicy(); err == nil && taskPolicy != nil {
+ if ecConfig := taskPolicy.GetErasureCodingConfig(); ecConfig != nil {
+ return ecConfig, nil
+ }
+ }
+
+ // Return default config if no valid config found
+ return &ErasureCodingTaskConfig{
+ FullnessRatio: 0.9,
+ QuietForSeconds: 3600,
+ MinVolumeSizeMb: 1024,
+ CollectionFilter: "",
+ }, nil
+}
+
+// LoadErasureCodingTaskPolicy loads complete EC task policy from protobuf file
+func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) {
+ if cp.dataDir == "" {
+ // Return default policy if no data directory
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
+ CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: 0.9,
+ QuietForSeconds: 3600,
+ MinVolumeSizeMb: 1024,
+ CollectionFilter: "",
+ },
+ },
+ }, nil
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, ECTaskConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ // Return default policy if file doesn't exist
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
+ CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: 0.9,
+ QuietForSeconds: 3600,
+ MinVolumeSizeMb: 1024,
+ CollectionFilter: "",
+ },
+ },
+ }, nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read EC task config file: %w", err)
+ }
+
+ // Try to unmarshal as TaskPolicy
+ var policy worker_pb.TaskPolicy
+ if err := proto.Unmarshal(configData, &policy); err == nil {
+ // Validate that it's actually a TaskPolicy with EC config
+ if policy.GetErasureCodingConfig() != nil {
+ glog.V(1).Infof("Loaded EC task policy from %s", configPath)
+ return &policy, nil
+ }
+ }
+
+ return nil, fmt.Errorf("failed to unmarshal EC task configuration")
+}
+
+// SaveBalanceTaskConfig saves balance task configuration to protobuf file
+func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error {
+ return cp.saveTaskConfig(BalanceTaskConfigFile, config)
+}
+
+// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file
+func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return cp.saveTaskConfig(BalanceTaskConfigFile, policy)
+}
+
+// LoadBalanceTaskConfig loads balance task configuration from protobuf file
+func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) {
+ // Load as TaskPolicy and extract balance config
+ if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil {
+ if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil {
+ return balanceConfig, nil
+ }
+ }
+
+ // Return default config if no valid config found
+ return &BalanceTaskConfig{
+ ImbalanceThreshold: 0.1,
+ MinServerCount: 2,
+ }, nil
+}
+
+// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file
+func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) {
+ if cp.dataDir == "" {
+ // Return default policy if no data directory
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: 0.1,
+ MinServerCount: 2,
+ },
+ },
+ }, nil
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, BalanceTaskConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ // Return default policy if file doesn't exist
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: 0.1,
+ MinServerCount: 2,
+ },
+ },
+ }, nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read balance task config file: %w", err)
+ }
+
+ // Try to unmarshal as TaskPolicy
+ var policy worker_pb.TaskPolicy
+ if err := proto.Unmarshal(configData, &policy); err == nil {
+ // Validate that it's actually a TaskPolicy with balance config
+ if policy.GetBalanceConfig() != nil {
+ glog.V(1).Infof("Loaded balance task policy from %s", configPath)
+ return &policy, nil
+ }
+ }
+
+ return nil, fmt.Errorf("failed to unmarshal balance task configuration")
+}
+
+// SaveReplicationTaskConfig saves replication task configuration to protobuf file
+func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error {
+ return cp.saveTaskConfig(ReplicationTaskConfigFile, config)
+}
+
+// LoadReplicationTaskConfig loads replication task configuration from protobuf file
+func (cp *ConfigPersistence) LoadReplicationTaskConfig() (*ReplicationTaskConfig, error) {
+ var config ReplicationTaskConfig
+ err := cp.loadTaskConfig(ReplicationTaskConfigFile, &config)
+ if err != nil {
+ // Return default config if file doesn't exist
+ if os.IsNotExist(err) {
+ return &ReplicationTaskConfig{
+ TargetReplicaCount: 1,
+ }, nil
+ }
+ return nil, err
+ }
+ return &config, nil
+}
+
+// saveTaskConfig is a generic helper for saving task configurations with both protobuf and JSON reference
+func (cp *ConfigPersistence) saveTaskConfig(filename string, config proto.Message) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot save task configuration")
+ }
+
+ // Create conf subdirectory path
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, filename)
+
+ // Generate JSON reference filename
+ jsonFilename := filename[:len(filename)-3] + ".json" // Replace .pb with .json
+ jsonPath := filepath.Join(confDir, jsonFilename)
+
+ // Create conf directory if it doesn't exist
+ if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil {
+ return fmt.Errorf("failed to create config directory: %w", err)
+ }
+
+ // Marshal configuration to protobuf binary format
+ configData, err := proto.Marshal(config)
+ if err != nil {
+ return fmt.Errorf("failed to marshal task config: %w", err)
+ }
+
+ // Write protobuf file
+ if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write task config file: %w", err)
+ }
+
+ // Marshal configuration to JSON for reference
+ marshaler := protojson.MarshalOptions{
+ Multiline: true,
+ Indent: " ",
+ EmitUnpopulated: true,
+ }
+ jsonData, err := marshaler.Marshal(config)
+ if err != nil {
+ glog.Warningf("Failed to marshal task config to JSON reference: %v", err)
+ } else {
+ // Write JSON reference file
+ if err := os.WriteFile(jsonPath, jsonData, ConfigFilePermissions); err != nil {
+ glog.Warningf("Failed to write task config JSON reference: %v", err)
+ }
+ }
+
+ glog.V(1).Infof("Saved task configuration to %s (with JSON reference)", configPath)
+ return nil
+}
+
+// loadTaskConfig is a generic helper for loading task configurations from conf subdirectory
+func (cp *ConfigPersistence) loadTaskConfig(filename string, config proto.Message) error {
+ if cp.dataDir == "" {
+ return os.ErrNotExist // Will trigger default config return
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, filename)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ return err // Will trigger default config return
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return fmt.Errorf("failed to read task config file: %w", err)
+ }
+
+ // Unmarshal protobuf binary data
+ if err := proto.Unmarshal(configData, config); err != nil {
+ return fmt.Errorf("failed to unmarshal task config: %w", err)
+ }
+
+ glog.V(1).Infof("Loaded task configuration from %s", configPath)
+ return nil
+}
+
// GetDataDir returns the data directory path
func (cp *ConfigPersistence) GetDataDir() string {
return cp.dataDir
@@ -249,6 +596,7 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
info := map[string]interface{}{
"data_dir_configured": cp.IsConfigured(),
"data_dir": cp.dataDir,
+ "config_subdir": ConfigSubdir,
}
if cp.IsConfigured() {
@@ -256,10 +604,18 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
if _, err := os.Stat(cp.dataDir); err == nil {
info["data_dir_exists"] = true
- // List config files
- configFiles, err := cp.ListConfigFiles()
- if err == nil {
- info["config_files"] = configFiles
+ // Check if conf subdirectory exists
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ if _, err := os.Stat(confDir); err == nil {
+ info["conf_dir_exists"] = true
+
+ // List config files
+ configFiles, err := cp.ListConfigFiles()
+ if err == nil {
+ info["config_files"] = configFiles
+ }
+ } else {
+ info["conf_dir_exists"] = false
}
} else {
info["data_dir_exists"] = false
@@ -268,3 +624,67 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
return info
}
+
+// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
+func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
+ policy := &worker_pb.MaintenancePolicy{
+ GlobalMaxConcurrent: 4,
+ DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
+ TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
+ }
+
+ // Load vacuum task configuration
+ if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
+ policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
+ Enabled: vacuumConfig.Enabled,
+ MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
+ MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
+ MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
+ },
+ },
+ }
+ }
+
+ // Load erasure coding task configuration
+ if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
+ policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
+ Enabled: ecConfig.Enabled,
+ MaxConcurrent: int32(ecConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: float64(ecConfig.FullnessRatio),
+ QuietForSeconds: int32(ecConfig.QuietForSeconds),
+ MinVolumeSizeMb: int32(ecConfig.MinSizeMB),
+ CollectionFilter: ecConfig.CollectionFilter,
+ },
+ },
+ }
+ }
+
+ // Load balance task configuration
+ if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
+ policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
+ Enabled: balanceConfig.Enabled,
+ MaxConcurrent: int32(balanceConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
+ MinServerCount: int32(balanceConfig.MinServerCount),
+ },
+ },
+ }
+ }
+
+ glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
+ return policy
+}