diff options
Diffstat (limited to 'weed/admin/dash/admin_server.go')
| -rw-r--r-- | weed/admin/dash/admin_server.go | 321 |
1 files changed, 279 insertions, 42 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 6ebade19f..376f3edc7 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/s3api" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" ) type AdminServer struct { @@ -126,30 +127,67 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string) } } - // Initialize maintenance system with persistent configuration + // Initialize maintenance system - always initialize even without persistent storage + var maintenanceConfig *maintenance.MaintenanceConfig if server.configPersistence.IsConfigured() { - maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig() + var err error + maintenanceConfig, err = server.configPersistence.LoadMaintenanceConfig() if err != nil { glog.Errorf("Failed to load maintenance configuration: %v", err) maintenanceConfig = maintenance.DefaultMaintenanceConfig() } - server.InitMaintenanceManager(maintenanceConfig) - // Start maintenance manager if enabled - if maintenanceConfig.Enabled { - go func() { - if err := server.StartMaintenanceManager(); err != nil { - glog.Errorf("Failed to start maintenance manager: %v", err) - } - }() + // Apply new defaults to handle schema changes (like enabling by default) + schema := maintenance.GetMaintenanceConfigSchema() + if err := schema.ApplyDefaultsToProtobuf(maintenanceConfig); err != nil { + glog.Warningf("Failed to apply schema defaults to loaded config: %v", err) + } + + // Force enable maintenance system for new default behavior + // This handles the case where old configs had Enabled=false as default + if !maintenanceConfig.Enabled { + glog.V(1).Infof("Enabling maintenance system (new default behavior)") + maintenanceConfig.Enabled = true } + + glog.V(1).Infof("Maintenance system initialized with persistent configuration (enabled: %v)", maintenanceConfig.Enabled) } else { - glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode") + maintenanceConfig = maintenance.DefaultMaintenanceConfig() + glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode (enabled: %v)", maintenanceConfig.Enabled) + } + + // Always initialize maintenance manager + server.InitMaintenanceManager(maintenanceConfig) + + // Load saved task configurations from persistence + server.loadTaskConfigurationsFromPersistence() + + // Start maintenance manager if enabled + if maintenanceConfig.Enabled { + go func() { + // Give master client a bit of time to connect before starting scans + time.Sleep(2 * time.Second) + if err := server.StartMaintenanceManager(); err != nil { + glog.Errorf("Failed to start maintenance manager: %v", err) + } + }() } return server } +// loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files +func (s *AdminServer) loadTaskConfigurationsFromPersistence() { + if s.configPersistence == nil || !s.configPersistence.IsConfigured() { + glog.V(1).Infof("Config persistence not available, using default task configurations") + return + } + + // Load task configurations dynamically using the config update registry + configUpdateRegistry := tasks.GetGlobalConfigUpdateRegistry() + configUpdateRegistry.UpdateAllConfigs(s.configPersistence) +} + // GetCredentialManager returns the credential manager func (s *AdminServer) GetCredentialManager() *credential.CredentialManager { return s.credentialManager @@ -852,6 +890,15 @@ func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"}) } +// cancelMaintenanceTask cancels a pending maintenance task +func (as *AdminServer) cancelMaintenanceTask(taskID string) error { + if as.maintenanceManager == nil { + return fmt.Errorf("maintenance manager not initialized") + } + + return as.maintenanceManager.CancelTask(taskID) +} + // GetMaintenanceWorkersAPI returns all maintenance workers func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) { workers, err := as.getMaintenanceWorkers() @@ -899,13 +946,21 @@ func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) { // UpdateMaintenanceConfigAPI updates maintenance configuration via API func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) { - var config MaintenanceConfig - if err := c.ShouldBindJSON(&config); err != nil { + // Parse JSON into a generic map first to handle type conversions + var jsonConfig map[string]interface{} + if err := c.ShouldBindJSON(&jsonConfig); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - err := as.updateMaintenanceConfig(&config) + // Convert JSON map to protobuf configuration + config, err := convertJSONToMaintenanceConfig(jsonConfig) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse configuration: " + err.Error()}) + return + } + + err = as.updateMaintenanceConfig(config) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return @@ -951,17 +1006,36 @@ func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueD }, nil } +// GetMaintenanceQueueStats returns statistics for the maintenance queue (exported for handlers) +func (as *AdminServer) GetMaintenanceQueueStats() (*maintenance.QueueStats, error) { + return as.getMaintenanceQueueStats() +} + // getMaintenanceQueueStats returns statistics for the maintenance queue func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) { - // This would integrate with the maintenance queue to get real statistics - // For now, return mock data - return &maintenance.QueueStats{ - PendingTasks: 5, - RunningTasks: 2, - CompletedToday: 15, - FailedToday: 1, - TotalTasks: 23, - }, nil + if as.maintenanceManager == nil { + return &maintenance.QueueStats{ + PendingTasks: 0, + RunningTasks: 0, + CompletedToday: 0, + FailedToday: 0, + TotalTasks: 0, + }, nil + } + + // Get real statistics from maintenance manager + stats := as.maintenanceManager.GetStats() + + // Convert MaintenanceStats to QueueStats + queueStats := &maintenance.QueueStats{ + PendingTasks: stats.TasksByStatus[maintenance.TaskStatusPending], + RunningTasks: stats.TasksByStatus[maintenance.TaskStatusAssigned] + stats.TasksByStatus[maintenance.TaskStatusInProgress], + CompletedToday: stats.CompletedToday, + FailedToday: stats.FailedToday, + TotalTasks: stats.TotalTasks, + } + + return queueStats, nil } // getMaintenanceTasks returns all maintenance tasks @@ -1000,15 +1074,6 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro return nil, fmt.Errorf("task %s not found", taskID) } -// cancelMaintenanceTask cancels a pending maintenance task -func (as *AdminServer) cancelMaintenanceTask(taskID string) error { - if as.maintenanceManager == nil { - return fmt.Errorf("maintenance manager not initialized") - } - - return as.maintenanceManager.CancelTask(taskID) -} - // getMaintenanceWorkers returns all maintenance workers func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) { if as.maintenanceManager == nil { @@ -1110,11 +1175,14 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat // Load configuration from persistent storage config, err := as.configPersistence.LoadMaintenanceConfig() if err != nil { - glog.Errorf("Failed to load maintenance configuration: %v", err) // Fallback to default configuration - config = DefaultMaintenanceConfig() + config = maintenance.DefaultMaintenanceConfig() } + // Note: Do NOT apply schema defaults to existing config as it overrides saved values + // Only apply defaults when creating new configs or handling fallback cases + // The schema defaults should only be used in the UI for new installations + // Get system stats from maintenance manager if available var systemStats *MaintenanceStats if as.maintenanceManager != nil { @@ -1139,18 +1207,25 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat } } - return &MaintenanceConfigData{ + configData := &MaintenanceConfigData{ Config: config, IsEnabled: config.Enabled, LastScanTime: systemStats.LastScanTime, NextScanTime: systemStats.NextScanTime, SystemStats: systemStats, MenuItems: maintenance.BuildMaintenanceMenuItems(), - }, nil + } + + return configData, nil } // updateMaintenanceConfig updates maintenance configuration func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error { + // Use ConfigField validation instead of standalone validation + if err := maintenance.ValidateMaintenanceConfigWithSchema(config); err != nil { + return fmt.Errorf("configuration validation failed: %v", err) + } + // Save configuration to persistent storage if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil { return fmt.Errorf("failed to save maintenance configuration: %w", err) @@ -1175,7 +1250,14 @@ func (as *AdminServer) triggerMaintenanceScan() error { return fmt.Errorf("maintenance manager not initialized") } - return as.maintenanceManager.TriggerScan() + glog.V(1).Infof("Triggering maintenance scan") + err := as.maintenanceManager.TriggerScan() + if err != nil { + glog.Errorf("Failed to trigger maintenance scan: %v", err) + return err + } + glog.V(1).Infof("Maintenance scan triggered successfully") + return nil } // TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API @@ -1265,14 +1347,11 @@ func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, err } // StartWorkerGrpcServer starts the worker gRPC server -func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error { +func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error { if s.workerGrpcServer != nil { return fmt.Errorf("worker gRPC server is already running") } - // Calculate gRPC port (HTTP port + 10000) - grpcPort := httpPort + 10000 - s.workerGrpcServer = NewWorkerGrpcServer(s) return s.workerGrpcServer.StartWithTLS(grpcPort) } @@ -1412,7 +1491,7 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, } // Create gRPC connection - conn, err := grpc.Dial(brokerAddress, s.grpcDialOption) + conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption) if err != nil { return fmt.Errorf("failed to connect to broker: %w", err) } @@ -1501,3 +1580,161 @@ func extractVersioningFromEntry(entry *filer_pb.Entry) bool { enabled, _ := s3api.LoadVersioningFromExtended(entry) return enabled } + +// GetConfigPersistence returns the config persistence manager +func (as *AdminServer) GetConfigPersistence() *ConfigPersistence { + return as.configPersistence +} + +// convertJSONToMaintenanceConfig converts JSON map to protobuf MaintenanceConfig +func convertJSONToMaintenanceConfig(jsonConfig map[string]interface{}) (*maintenance.MaintenanceConfig, error) { + config := &maintenance.MaintenanceConfig{} + + // Helper function to get int32 from interface{} + getInt32 := func(key string) (int32, error) { + if val, ok := jsonConfig[key]; ok { + switch v := val.(type) { + case int: + return int32(v), nil + case int32: + return v, nil + case int64: + return int32(v), nil + case float64: + return int32(v), nil + default: + return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v) + } + } + return 0, nil + } + + // Helper function to get bool from interface{} + getBool := func(key string) bool { + if val, ok := jsonConfig[key]; ok { + if b, ok := val.(bool); ok { + return b + } + } + return false + } + + var err error + + // Convert basic fields + config.Enabled = getBool("enabled") + + if config.ScanIntervalSeconds, err = getInt32("scan_interval_seconds"); err != nil { + return nil, err + } + if config.WorkerTimeoutSeconds, err = getInt32("worker_timeout_seconds"); err != nil { + return nil, err + } + if config.TaskTimeoutSeconds, err = getInt32("task_timeout_seconds"); err != nil { + return nil, err + } + if config.RetryDelaySeconds, err = getInt32("retry_delay_seconds"); err != nil { + return nil, err + } + if config.MaxRetries, err = getInt32("max_retries"); err != nil { + return nil, err + } + if config.CleanupIntervalSeconds, err = getInt32("cleanup_interval_seconds"); err != nil { + return nil, err + } + if config.TaskRetentionSeconds, err = getInt32("task_retention_seconds"); err != nil { + return nil, err + } + + // Convert policy if present + if policyData, ok := jsonConfig["policy"]; ok { + if policyMap, ok := policyData.(map[string]interface{}); ok { + policy := &maintenance.MaintenancePolicy{} + + if globalMaxConcurrent, err := getInt32FromMap(policyMap, "global_max_concurrent"); err != nil { + return nil, err + } else { + policy.GlobalMaxConcurrent = globalMaxConcurrent + } + + if defaultRepeatIntervalSeconds, err := getInt32FromMap(policyMap, "default_repeat_interval_seconds"); err != nil { + return nil, err + } else { + policy.DefaultRepeatIntervalSeconds = defaultRepeatIntervalSeconds + } + + if defaultCheckIntervalSeconds, err := getInt32FromMap(policyMap, "default_check_interval_seconds"); err != nil { + return nil, err + } else { + policy.DefaultCheckIntervalSeconds = defaultCheckIntervalSeconds + } + + // Convert task policies if present + if taskPoliciesData, ok := policyMap["task_policies"]; ok { + if taskPoliciesMap, ok := taskPoliciesData.(map[string]interface{}); ok { + policy.TaskPolicies = make(map[string]*maintenance.TaskPolicy) + + for taskType, taskPolicyData := range taskPoliciesMap { + if taskPolicyMap, ok := taskPolicyData.(map[string]interface{}); ok { + taskPolicy := &maintenance.TaskPolicy{} + + taskPolicy.Enabled = getBoolFromMap(taskPolicyMap, "enabled") + + if maxConcurrent, err := getInt32FromMap(taskPolicyMap, "max_concurrent"); err != nil { + return nil, err + } else { + taskPolicy.MaxConcurrent = maxConcurrent + } + + if repeatIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "repeat_interval_seconds"); err != nil { + return nil, err + } else { + taskPolicy.RepeatIntervalSeconds = repeatIntervalSeconds + } + + if checkIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "check_interval_seconds"); err != nil { + return nil, err + } else { + taskPolicy.CheckIntervalSeconds = checkIntervalSeconds + } + + policy.TaskPolicies[taskType] = taskPolicy + } + } + } + } + + config.Policy = policy + } + } + + return config, nil +} + +// Helper functions for map conversion +func getInt32FromMap(m map[string]interface{}, key string) (int32, error) { + if val, ok := m[key]; ok { + switch v := val.(type) { + case int: + return int32(v), nil + case int32: + return v, nil + case int64: + return int32(v), nil + case float64: + return int32(v), nil + default: + return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v) + } + } + return 0, nil +} + +func getBoolFromMap(m map[string]interface{}, key string) bool { + if val, ok := m[key]; ok { + if b, ok := val.(bool); ok { + return b + } + } + return false +} |
