aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_config_proto.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/maintenance/maintenance_config_proto.go')
-rw-r--r--weed/admin/maintenance/maintenance_config_proto.go287
1 files changed, 287 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_config_proto.go b/weed/admin/maintenance/maintenance_config_proto.go
new file mode 100644
index 000000000..67a6b74be
--- /dev/null
+++ b/weed/admin/maintenance/maintenance_config_proto.go
@@ -0,0 +1,287 @@
+package maintenance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+)
+
+// MaintenanceConfigManager handles protobuf-based configuration
+type MaintenanceConfigManager struct {
+ config *worker_pb.MaintenanceConfig
+}
+
+// NewMaintenanceConfigManager creates a new config manager with defaults
+func NewMaintenanceConfigManager() *MaintenanceConfigManager {
+ return &MaintenanceConfigManager{
+ config: DefaultMaintenanceConfigProto(),
+ }
+}
+
+// DefaultMaintenanceConfigProto returns default configuration as protobuf
+func DefaultMaintenanceConfigProto() *worker_pb.MaintenanceConfig {
+ return &worker_pb.MaintenanceConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 30 * 60, // 30 minutes
+ WorkerTimeoutSeconds: 5 * 60, // 5 minutes
+ TaskTimeoutSeconds: 2 * 60 * 60, // 2 hours
+ RetryDelaySeconds: 15 * 60, // 15 minutes
+ MaxRetries: 3,
+ CleanupIntervalSeconds: 24 * 60 * 60, // 24 hours
+ TaskRetentionSeconds: 7 * 24 * 60 * 60, // 7 days
+ // Policy field will be populated dynamically from separate task configuration files
+ Policy: nil,
+ }
+}
+
+// GetConfig returns the current configuration
+func (mcm *MaintenanceConfigManager) GetConfig() *worker_pb.MaintenanceConfig {
+ return mcm.config
+}
+
+// Type-safe configuration accessors
+
+// GetVacuumConfig returns vacuum-specific configuration for a task type
+func (mcm *MaintenanceConfigManager) GetVacuumConfig(taskType string) *worker_pb.VacuumTaskConfig {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil {
+ return vacuumConfig
+ }
+ }
+ // Return defaults if not configured
+ return &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: 0.3,
+ MinVolumeAgeHours: 24,
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }
+}
+
+// GetErasureCodingConfig returns EC-specific configuration for a task type
+func (mcm *MaintenanceConfigManager) GetErasureCodingConfig(taskType string) *worker_pb.ErasureCodingTaskConfig {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
+ return ecConfig
+ }
+ }
+ // Return defaults if not configured
+ return &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: 0.95,
+ QuietForSeconds: 3600,
+ MinVolumeSizeMb: 100,
+ CollectionFilter: "",
+ }
+}
+
+// GetBalanceConfig returns balance-specific configuration for a task type
+func (mcm *MaintenanceConfigManager) GetBalanceConfig(taskType string) *worker_pb.BalanceTaskConfig {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil {
+ return balanceConfig
+ }
+ }
+ // Return defaults if not configured
+ return &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: 0.2,
+ MinServerCount: 2,
+ }
+}
+
+// GetReplicationConfig returns replication-specific configuration for a task type
+func (mcm *MaintenanceConfigManager) GetReplicationConfig(taskType string) *worker_pb.ReplicationTaskConfig {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ if replicationConfig := policy.GetReplicationConfig(); replicationConfig != nil {
+ return replicationConfig
+ }
+ }
+ // Return defaults if not configured
+ return &worker_pb.ReplicationTaskConfig{
+ TargetReplicaCount: 2,
+ }
+}
+
+// Typed convenience methods for getting task configurations
+
+// GetVacuumTaskConfigForType returns vacuum configuration for a specific task type
+func (mcm *MaintenanceConfigManager) GetVacuumTaskConfigForType(taskType string) *worker_pb.VacuumTaskConfig {
+ return GetVacuumTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
+}
+
+// GetErasureCodingTaskConfigForType returns erasure coding configuration for a specific task type
+func (mcm *MaintenanceConfigManager) GetErasureCodingTaskConfigForType(taskType string) *worker_pb.ErasureCodingTaskConfig {
+ return GetErasureCodingTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
+}
+
+// GetBalanceTaskConfigForType returns balance configuration for a specific task type
+func (mcm *MaintenanceConfigManager) GetBalanceTaskConfigForType(taskType string) *worker_pb.BalanceTaskConfig {
+ return GetBalanceTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
+}
+
+// GetReplicationTaskConfigForType returns replication configuration for a specific task type
+func (mcm *MaintenanceConfigManager) GetReplicationTaskConfigForType(taskType string) *worker_pb.ReplicationTaskConfig {
+ return GetReplicationTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
+}
+
+// Helper methods
+
+func (mcm *MaintenanceConfigManager) getTaskPolicy(taskType string) *worker_pb.TaskPolicy {
+ if mcm.config.Policy != nil && mcm.config.Policy.TaskPolicies != nil {
+ return mcm.config.Policy.TaskPolicies[taskType]
+ }
+ return nil
+}
+
+// IsTaskEnabled returns whether a task type is enabled
+func (mcm *MaintenanceConfigManager) IsTaskEnabled(taskType string) bool {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ return policy.Enabled
+ }
+ return false
+}
+
+// GetMaxConcurrent returns the max concurrent limit for a task type
+func (mcm *MaintenanceConfigManager) GetMaxConcurrent(taskType string) int32 {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ return policy.MaxConcurrent
+ }
+ return 1 // Default
+}
+
+// GetRepeatInterval returns the repeat interval for a task type in seconds
+func (mcm *MaintenanceConfigManager) GetRepeatInterval(taskType string) int32 {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ return policy.RepeatIntervalSeconds
+ }
+ return mcm.config.Policy.DefaultRepeatIntervalSeconds
+}
+
+// GetCheckInterval returns the check interval for a task type in seconds
+func (mcm *MaintenanceConfigManager) GetCheckInterval(taskType string) int32 {
+ if policy := mcm.getTaskPolicy(taskType); policy != nil {
+ return policy.CheckIntervalSeconds
+ }
+ return mcm.config.Policy.DefaultCheckIntervalSeconds
+}
+
+// Duration accessor methods
+
+// GetScanInterval returns the scan interval as a time.Duration
+func (mcm *MaintenanceConfigManager) GetScanInterval() time.Duration {
+ return time.Duration(mcm.config.ScanIntervalSeconds) * time.Second
+}
+
+// GetWorkerTimeout returns the worker timeout as a time.Duration
+func (mcm *MaintenanceConfigManager) GetWorkerTimeout() time.Duration {
+ return time.Duration(mcm.config.WorkerTimeoutSeconds) * time.Second
+}
+
+// GetTaskTimeout returns the task timeout as a time.Duration
+func (mcm *MaintenanceConfigManager) GetTaskTimeout() time.Duration {
+ return time.Duration(mcm.config.TaskTimeoutSeconds) * time.Second
+}
+
+// GetRetryDelay returns the retry delay as a time.Duration
+func (mcm *MaintenanceConfigManager) GetRetryDelay() time.Duration {
+ return time.Duration(mcm.config.RetryDelaySeconds) * time.Second
+}
+
+// GetCleanupInterval returns the cleanup interval as a time.Duration
+func (mcm *MaintenanceConfigManager) GetCleanupInterval() time.Duration {
+ return time.Duration(mcm.config.CleanupIntervalSeconds) * time.Second
+}
+
+// GetTaskRetention returns the task retention period as a time.Duration
+func (mcm *MaintenanceConfigManager) GetTaskRetention() time.Duration {
+ return time.Duration(mcm.config.TaskRetentionSeconds) * time.Second
+}
+
+// ValidateMaintenanceConfigWithSchema validates protobuf maintenance configuration using ConfigField rules
+func ValidateMaintenanceConfigWithSchema(config *worker_pb.MaintenanceConfig) error {
+ if config == nil {
+ return fmt.Errorf("configuration cannot be nil")
+ }
+
+ // Get the schema to access field validation rules
+ schema := GetMaintenanceConfigSchema()
+
+ // Validate each field individually using the ConfigField rules
+ if err := validateFieldWithSchema(schema, "enabled", config.Enabled); err != nil {
+ return err
+ }
+
+ if err := validateFieldWithSchema(schema, "scan_interval_seconds", int(config.ScanIntervalSeconds)); err != nil {
+ return err
+ }
+
+ if err := validateFieldWithSchema(schema, "worker_timeout_seconds", int(config.WorkerTimeoutSeconds)); err != nil {
+ return err
+ }
+
+ if err := validateFieldWithSchema(schema, "task_timeout_seconds", int(config.TaskTimeoutSeconds)); err != nil {
+ return err
+ }
+
+ if err := validateFieldWithSchema(schema, "retry_delay_seconds", int(config.RetryDelaySeconds)); err != nil {
+ return err
+ }
+
+ if err := validateFieldWithSchema(schema, "max_retries", int(config.MaxRetries)); err != nil {
+ return err
+ }
+
+ if err := validateFieldWithSchema(schema, "cleanup_interval_seconds", int(config.CleanupIntervalSeconds)); err != nil {
+ return err
+ }
+
+ if err := validateFieldWithSchema(schema, "task_retention_seconds", int(config.TaskRetentionSeconds)); err != nil {
+ return err
+ }
+
+ // Validate policy fields if present
+ if config.Policy != nil {
+ // Note: These field names might need to be adjusted based on the actual schema
+ if err := validatePolicyField("global_max_concurrent", int(config.Policy.GlobalMaxConcurrent)); err != nil {
+ return err
+ }
+
+ if err := validatePolicyField("default_repeat_interval_seconds", int(config.Policy.DefaultRepeatIntervalSeconds)); err != nil {
+ return err
+ }
+
+ if err := validatePolicyField("default_check_interval_seconds", int(config.Policy.DefaultCheckIntervalSeconds)); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// validateFieldWithSchema validates a single field using its ConfigField definition
+func validateFieldWithSchema(schema *MaintenanceConfigSchema, fieldName string, value interface{}) error {
+ field := schema.GetFieldByName(fieldName)
+ if field == nil {
+ // Field not in schema, skip validation
+ return nil
+ }
+
+ return field.ValidateValue(value)
+}
+
+// validatePolicyField validates policy fields (simplified validation for now)
+func validatePolicyField(fieldName string, value int) error {
+ switch fieldName {
+ case "global_max_concurrent":
+ if value < 1 || value > 20 {
+ return fmt.Errorf("Global Max Concurrent must be between 1 and 20, got %d", value)
+ }
+ case "default_repeat_interval":
+ if value < 1 || value > 168 {
+ return fmt.Errorf("Default Repeat Interval must be between 1 and 168 hours, got %d", value)
+ }
+ case "default_check_interval":
+ if value < 1 || value > 168 {
+ return fmt.Errorf("Default Check Interval must be between 1 and 168 hours, got %d", value)
+ }
+ }
+ return nil
+}