aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash/admin_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash/admin_server.go')
-rw-r--r--weed/admin/dash/admin_server.go321
1 files changed, 279 insertions, 42 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go
index 6ebade19f..376f3edc7 100644
--- a/weed/admin/dash/admin_server.go
+++ b/weed/admin/dash/admin_server.go
@@ -25,6 +25,7 @@ import (
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/s3api"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
type AdminServer struct {
@@ -126,30 +127,67 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string)
}
}
- // Initialize maintenance system with persistent configuration
+ // Initialize maintenance system - always initialize even without persistent storage
+ var maintenanceConfig *maintenance.MaintenanceConfig
if server.configPersistence.IsConfigured() {
- maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig()
+ var err error
+ maintenanceConfig, err = server.configPersistence.LoadMaintenanceConfig()
if err != nil {
glog.Errorf("Failed to load maintenance configuration: %v", err)
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
}
- server.InitMaintenanceManager(maintenanceConfig)
- // Start maintenance manager if enabled
- if maintenanceConfig.Enabled {
- go func() {
- if err := server.StartMaintenanceManager(); err != nil {
- glog.Errorf("Failed to start maintenance manager: %v", err)
- }
- }()
+ // Apply new defaults to handle schema changes (like enabling by default)
+ schema := maintenance.GetMaintenanceConfigSchema()
+ if err := schema.ApplyDefaultsToProtobuf(maintenanceConfig); err != nil {
+ glog.Warningf("Failed to apply schema defaults to loaded config: %v", err)
+ }
+
+ // Force enable maintenance system for new default behavior
+ // This handles the case where old configs had Enabled=false as default
+ if !maintenanceConfig.Enabled {
+ glog.V(1).Infof("Enabling maintenance system (new default behavior)")
+ maintenanceConfig.Enabled = true
}
+
+ glog.V(1).Infof("Maintenance system initialized with persistent configuration (enabled: %v)", maintenanceConfig.Enabled)
} else {
- glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode")
+ maintenanceConfig = maintenance.DefaultMaintenanceConfig()
+ glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode (enabled: %v)", maintenanceConfig.Enabled)
+ }
+
+ // Always initialize maintenance manager
+ server.InitMaintenanceManager(maintenanceConfig)
+
+ // Load saved task configurations from persistence
+ server.loadTaskConfigurationsFromPersistence()
+
+ // Start maintenance manager if enabled
+ if maintenanceConfig.Enabled {
+ go func() {
+ // Give master client a bit of time to connect before starting scans
+ time.Sleep(2 * time.Second)
+ if err := server.StartMaintenanceManager(); err != nil {
+ glog.Errorf("Failed to start maintenance manager: %v", err)
+ }
+ }()
}
return server
}
+// loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files
+func (s *AdminServer) loadTaskConfigurationsFromPersistence() {
+ if s.configPersistence == nil || !s.configPersistence.IsConfigured() {
+ glog.V(1).Infof("Config persistence not available, using default task configurations")
+ return
+ }
+
+ // Load task configurations dynamically using the config update registry
+ configUpdateRegistry := tasks.GetGlobalConfigUpdateRegistry()
+ configUpdateRegistry.UpdateAllConfigs(s.configPersistence)
+}
+
// GetCredentialManager returns the credential manager
func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
return s.credentialManager
@@ -852,6 +890,15 @@ func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"})
}
+// cancelMaintenanceTask cancels a pending maintenance task
+func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
+ if as.maintenanceManager == nil {
+ return fmt.Errorf("maintenance manager not initialized")
+ }
+
+ return as.maintenanceManager.CancelTask(taskID)
+}
+
// GetMaintenanceWorkersAPI returns all maintenance workers
func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) {
workers, err := as.getMaintenanceWorkers()
@@ -899,13 +946,21 @@ func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) {
// UpdateMaintenanceConfigAPI updates maintenance configuration via API
func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) {
- var config MaintenanceConfig
- if err := c.ShouldBindJSON(&config); err != nil {
+ // Parse JSON into a generic map first to handle type conversions
+ var jsonConfig map[string]interface{}
+ if err := c.ShouldBindJSON(&jsonConfig); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
- err := as.updateMaintenanceConfig(&config)
+ // Convert JSON map to protobuf configuration
+ config, err := convertJSONToMaintenanceConfig(jsonConfig)
+ if err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse configuration: " + err.Error()})
+ return
+ }
+
+ err = as.updateMaintenanceConfig(config)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
@@ -951,17 +1006,36 @@ func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueD
}, nil
}
+// GetMaintenanceQueueStats returns statistics for the maintenance queue (exported for handlers)
+func (as *AdminServer) GetMaintenanceQueueStats() (*maintenance.QueueStats, error) {
+ return as.getMaintenanceQueueStats()
+}
+
// getMaintenanceQueueStats returns statistics for the maintenance queue
func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) {
- // This would integrate with the maintenance queue to get real statistics
- // For now, return mock data
- return &maintenance.QueueStats{
- PendingTasks: 5,
- RunningTasks: 2,
- CompletedToday: 15,
- FailedToday: 1,
- TotalTasks: 23,
- }, nil
+ if as.maintenanceManager == nil {
+ return &maintenance.QueueStats{
+ PendingTasks: 0,
+ RunningTasks: 0,
+ CompletedToday: 0,
+ FailedToday: 0,
+ TotalTasks: 0,
+ }, nil
+ }
+
+ // Get real statistics from maintenance manager
+ stats := as.maintenanceManager.GetStats()
+
+ // Convert MaintenanceStats to QueueStats
+ queueStats := &maintenance.QueueStats{
+ PendingTasks: stats.TasksByStatus[maintenance.TaskStatusPending],
+ RunningTasks: stats.TasksByStatus[maintenance.TaskStatusAssigned] + stats.TasksByStatus[maintenance.TaskStatusInProgress],
+ CompletedToday: stats.CompletedToday,
+ FailedToday: stats.FailedToday,
+ TotalTasks: stats.TotalTasks,
+ }
+
+ return queueStats, nil
}
// getMaintenanceTasks returns all maintenance tasks
@@ -1000,15 +1074,6 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro
return nil, fmt.Errorf("task %s not found", taskID)
}
-// cancelMaintenanceTask cancels a pending maintenance task
-func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
- if as.maintenanceManager == nil {
- return fmt.Errorf("maintenance manager not initialized")
- }
-
- return as.maintenanceManager.CancelTask(taskID)
-}
-
// getMaintenanceWorkers returns all maintenance workers
func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
if as.maintenanceManager == nil {
@@ -1110,11 +1175,14 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat
// Load configuration from persistent storage
config, err := as.configPersistence.LoadMaintenanceConfig()
if err != nil {
- glog.Errorf("Failed to load maintenance configuration: %v", err)
// Fallback to default configuration
- config = DefaultMaintenanceConfig()
+ config = maintenance.DefaultMaintenanceConfig()
}
+ // Note: Do NOT apply schema defaults to existing config as it overrides saved values
+ // Only apply defaults when creating new configs or handling fallback cases
+ // The schema defaults should only be used in the UI for new installations
+
// Get system stats from maintenance manager if available
var systemStats *MaintenanceStats
if as.maintenanceManager != nil {
@@ -1139,18 +1207,25 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat
}
}
- return &MaintenanceConfigData{
+ configData := &MaintenanceConfigData{
Config: config,
IsEnabled: config.Enabled,
LastScanTime: systemStats.LastScanTime,
NextScanTime: systemStats.NextScanTime,
SystemStats: systemStats,
MenuItems: maintenance.BuildMaintenanceMenuItems(),
- }, nil
+ }
+
+ return configData, nil
}
// updateMaintenanceConfig updates maintenance configuration
func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error {
+ // Use ConfigField validation instead of standalone validation
+ if err := maintenance.ValidateMaintenanceConfigWithSchema(config); err != nil {
+ return fmt.Errorf("configuration validation failed: %v", err)
+ }
+
// Save configuration to persistent storage
if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil {
return fmt.Errorf("failed to save maintenance configuration: %w", err)
@@ -1175,7 +1250,14 @@ func (as *AdminServer) triggerMaintenanceScan() error {
return fmt.Errorf("maintenance manager not initialized")
}
- return as.maintenanceManager.TriggerScan()
+ glog.V(1).Infof("Triggering maintenance scan")
+ err := as.maintenanceManager.TriggerScan()
+ if err != nil {
+ glog.Errorf("Failed to trigger maintenance scan: %v", err)
+ return err
+ }
+ glog.V(1).Infof("Maintenance scan triggered successfully")
+ return nil
}
// TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API
@@ -1265,14 +1347,11 @@ func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, err
}
// StartWorkerGrpcServer starts the worker gRPC server
-func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error {
+func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error {
if s.workerGrpcServer != nil {
return fmt.Errorf("worker gRPC server is already running")
}
- // Calculate gRPC port (HTTP port + 10000)
- grpcPort := httpPort + 10000
-
s.workerGrpcServer = NewWorkerGrpcServer(s)
return s.workerGrpcServer.StartWithTLS(grpcPort)
}
@@ -1412,7 +1491,7 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool,
}
// Create gRPC connection
- conn, err := grpc.Dial(brokerAddress, s.grpcDialOption)
+ conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption)
if err != nil {
return fmt.Errorf("failed to connect to broker: %w", err)
}
@@ -1501,3 +1580,161 @@ func extractVersioningFromEntry(entry *filer_pb.Entry) bool {
enabled, _ := s3api.LoadVersioningFromExtended(entry)
return enabled
}
+
+// GetConfigPersistence returns the config persistence manager
+func (as *AdminServer) GetConfigPersistence() *ConfigPersistence {
+ return as.configPersistence
+}
+
+// convertJSONToMaintenanceConfig converts JSON map to protobuf MaintenanceConfig
+func convertJSONToMaintenanceConfig(jsonConfig map[string]interface{}) (*maintenance.MaintenanceConfig, error) {
+ config := &maintenance.MaintenanceConfig{}
+
+ // Helper function to get int32 from interface{}
+ getInt32 := func(key string) (int32, error) {
+ if val, ok := jsonConfig[key]; ok {
+ switch v := val.(type) {
+ case int:
+ return int32(v), nil
+ case int32:
+ return v, nil
+ case int64:
+ return int32(v), nil
+ case float64:
+ return int32(v), nil
+ default:
+ return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
+ }
+ }
+ return 0, nil
+ }
+
+ // Helper function to get bool from interface{}
+ getBool := func(key string) bool {
+ if val, ok := jsonConfig[key]; ok {
+ if b, ok := val.(bool); ok {
+ return b
+ }
+ }
+ return false
+ }
+
+ var err error
+
+ // Convert basic fields
+ config.Enabled = getBool("enabled")
+
+ if config.ScanIntervalSeconds, err = getInt32("scan_interval_seconds"); err != nil {
+ return nil, err
+ }
+ if config.WorkerTimeoutSeconds, err = getInt32("worker_timeout_seconds"); err != nil {
+ return nil, err
+ }
+ if config.TaskTimeoutSeconds, err = getInt32("task_timeout_seconds"); err != nil {
+ return nil, err
+ }
+ if config.RetryDelaySeconds, err = getInt32("retry_delay_seconds"); err != nil {
+ return nil, err
+ }
+ if config.MaxRetries, err = getInt32("max_retries"); err != nil {
+ return nil, err
+ }
+ if config.CleanupIntervalSeconds, err = getInt32("cleanup_interval_seconds"); err != nil {
+ return nil, err
+ }
+ if config.TaskRetentionSeconds, err = getInt32("task_retention_seconds"); err != nil {
+ return nil, err
+ }
+
+ // Convert policy if present
+ if policyData, ok := jsonConfig["policy"]; ok {
+ if policyMap, ok := policyData.(map[string]interface{}); ok {
+ policy := &maintenance.MaintenancePolicy{}
+
+ if globalMaxConcurrent, err := getInt32FromMap(policyMap, "global_max_concurrent"); err != nil {
+ return nil, err
+ } else {
+ policy.GlobalMaxConcurrent = globalMaxConcurrent
+ }
+
+ if defaultRepeatIntervalSeconds, err := getInt32FromMap(policyMap, "default_repeat_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ policy.DefaultRepeatIntervalSeconds = defaultRepeatIntervalSeconds
+ }
+
+ if defaultCheckIntervalSeconds, err := getInt32FromMap(policyMap, "default_check_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ policy.DefaultCheckIntervalSeconds = defaultCheckIntervalSeconds
+ }
+
+ // Convert task policies if present
+ if taskPoliciesData, ok := policyMap["task_policies"]; ok {
+ if taskPoliciesMap, ok := taskPoliciesData.(map[string]interface{}); ok {
+ policy.TaskPolicies = make(map[string]*maintenance.TaskPolicy)
+
+ for taskType, taskPolicyData := range taskPoliciesMap {
+ if taskPolicyMap, ok := taskPolicyData.(map[string]interface{}); ok {
+ taskPolicy := &maintenance.TaskPolicy{}
+
+ taskPolicy.Enabled = getBoolFromMap(taskPolicyMap, "enabled")
+
+ if maxConcurrent, err := getInt32FromMap(taskPolicyMap, "max_concurrent"); err != nil {
+ return nil, err
+ } else {
+ taskPolicy.MaxConcurrent = maxConcurrent
+ }
+
+ if repeatIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "repeat_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ taskPolicy.RepeatIntervalSeconds = repeatIntervalSeconds
+ }
+
+ if checkIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "check_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ taskPolicy.CheckIntervalSeconds = checkIntervalSeconds
+ }
+
+ policy.TaskPolicies[taskType] = taskPolicy
+ }
+ }
+ }
+ }
+
+ config.Policy = policy
+ }
+ }
+
+ return config, nil
+}
+
+// Helper functions for map conversion
+func getInt32FromMap(m map[string]interface{}, key string) (int32, error) {
+ if val, ok := m[key]; ok {
+ switch v := val.(type) {
+ case int:
+ return int32(v), nil
+ case int32:
+ return v, nil
+ case int64:
+ return int32(v), nil
+ case float64:
+ return int32(v), nil
+ default:
+ return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
+ }
+ }
+ return 0, nil
+}
+
+func getBoolFromMap(m map[string]interface{}, key string) bool {
+ if val, ok := m[key]; ok {
+ if b, ok := val.(bool); ok {
+ return b
+ }
+ }
+ return false
+}