diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-06 13:57:02 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-06 13:57:02 -0700 |
| commit | aa668523047c273dc4065dc0f40852efcdf9e9f0 (patch) | |
| tree | 87f7f145d699cf1824c8251ae71435462bfd3318 /weed/admin/dash | |
| parent | 302e62d4805c60f3fdb6620b01e85859d68078ed (diff) | |
| download | seaweedfs-aa668523047c273dc4065dc0f40852efcdf9e9f0.tar.xz seaweedfs-aa668523047c273dc4065dc0f40852efcdf9e9f0.zip | |
Admin UI add maintenance menu (#6944)
* add ui for maintenance
* valid config loading. fix workers page.
* refactor
* grpc between admin and workers
* add a long-running bidirectional grpc call between admin and worker
* use the grpc call to heartbeat
* use the grpc call to communicate
* worker can remove the http client
* admin uses http port + 10000 as its default grpc port
* one task one package
* handles connection failures gracefully with exponential backoff
* grpc with insecure tls
* grpc with optional tls
* fix detecting tls
* change time config from nano seconds to seconds
* add tasks with 3 interfaces
* compiles reducing hard coded
* remove a couple of tasks
* remove hard coded references
* reduce hard coded values
* remove hard coded values
* remove hard coded from templ
* refactor maintenance package
* fix import cycle
* simplify
* simplify
* auto register
* auto register factory
* auto register task types
* self register types
* refactor
* simplify
* remove one task
* register ui
* lazy init executor factories
* use registered task types
* DefaultWorkerConfig remove hard coded task types
* remove more hard coded
* implement get maintenance task
* dynamic task configuration
* "System Settings" should only have system level settings
* adjust menu for tasks
* ensure menu not collapsed
* render job configuration well
* use templ for ui of task configuration
* fix ordering
* fix bugs
* saving duration in seconds
* use value and unit for duration
* Delete WORKER_REFACTORING_PLAN.md
* Delete maintenance.json
* Delete custom_worker_example.go
* remove address from workers
* remove old code from ec task
* remove creating collection button
* reconnect with exponential backoff
* worker use security.toml
* start admin server with tls info from security.toml
* fix "weed admin" cli description
Diffstat (limited to 'weed/admin/dash')
| -rw-r--r-- | weed/admin/dash/admin_server.go | 632 | ||||
| -rw-r--r-- | weed/admin/dash/config_persistence.go | 270 | ||||
| -rw-r--r-- | weed/admin/dash/types.go | 49 | ||||
| -rw-r--r-- | weed/admin/dash/worker_grpc_server.go | 461 |
4 files changed, 1411 insertions, 1 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 03a44a6da..c8da2bbb7 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -7,6 +7,8 @@ import ( "net/http" "time" + "github.com/gin-gonic/gin" + "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -22,6 +24,7 @@ import ( type AdminServer struct { masterAddress string templateFS http.FileSystem + dataDir string grpcDialOption grpc.DialOption cacheExpiration time.Duration lastCacheUpdate time.Time @@ -34,17 +37,28 @@ type AdminServer struct { // Credential management credentialManager *credential.CredentialManager + + // Configuration persistence + configPersistence *ConfigPersistence + + // Maintenance system + maintenanceManager *maintenance.MaintenanceManager + + // Worker gRPC server + workerGrpcServer *WorkerGrpcServer } // Type definitions moved to types.go -func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServer { +func NewAdminServer(masterAddress string, templateFS http.FileSystem, dataDir string) *AdminServer { server := &AdminServer{ masterAddress: masterAddress, templateFS: templateFS, + dataDir: dataDir, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), cacheExpiration: 10 * time.Second, filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds + configPersistence: NewConfigPersistence(dataDir), } // Initialize credential manager with defaults @@ -82,6 +96,27 @@ func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServ } } + // Initialize maintenance system with persistent configuration + if server.configPersistence.IsConfigured() { + maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig() + if err != nil { + glog.Errorf("Failed to load maintenance configuration: %v", err) + maintenanceConfig = maintenance.DefaultMaintenanceConfig() + } + server.InitMaintenanceManager(maintenanceConfig) + + // Start maintenance manager if enabled + if maintenanceConfig.Enabled { + go func() { + if err := server.StartMaintenanceManager(); err != nil { + glog.Errorf("Failed to start maintenance manager: %v", err) + } + }() + } + } else { + glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode") + } + return server } @@ -568,3 +603,598 @@ func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) { // GetVolumeDetails method moved to volume_management.go // VacuumVolume method moved to volume_management.go + +// ShowMaintenanceQueue displays the maintenance queue page +func (as *AdminServer) ShowMaintenanceQueue(c *gin.Context) { + data, err := as.getMaintenanceQueueData() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + // This should not render HTML template, it should use the component approach + c.JSON(http.StatusOK, data) +} + +// ShowMaintenanceWorkers displays the maintenance workers page +func (as *AdminServer) ShowMaintenanceWorkers(c *gin.Context) { + workers, err := as.getMaintenanceWorkers() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + // Create worker details data + workersData := make([]*WorkerDetailsData, 0, len(workers)) + for _, worker := range workers { + details, err := as.getMaintenanceWorkerDetails(worker.ID) + if err != nil { + // Create basic worker details if we can't get full details + details = &WorkerDetailsData{ + Worker: worker, + CurrentTasks: []*MaintenanceTask{}, + RecentTasks: []*MaintenanceTask{}, + Performance: &WorkerPerformance{ + TasksCompleted: 0, + TasksFailed: 0, + AverageTaskTime: 0, + Uptime: 0, + SuccessRate: 0, + }, + LastUpdated: time.Now(), + } + } + workersData = append(workersData, details) + } + + c.JSON(http.StatusOK, gin.H{ + "workers": workersData, + "title": "Maintenance Workers", + }) +} + +// ShowMaintenanceConfig displays the maintenance configuration page +func (as *AdminServer) ShowMaintenanceConfig(c *gin.Context) { + config, err := as.getMaintenanceConfig() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + // This should not render HTML template, it should use the component approach + c.JSON(http.StatusOK, config) +} + +// UpdateMaintenanceConfig updates maintenance configuration from form +func (as *AdminServer) UpdateMaintenanceConfig(c *gin.Context) { + var config MaintenanceConfig + if err := c.ShouldBind(&config); err != nil { + c.HTML(http.StatusBadRequest, "error.html", gin.H{"error": err.Error()}) + return + } + + err := as.updateMaintenanceConfig(&config) + if err != nil { + c.HTML(http.StatusInternalServerError, "error.html", gin.H{"error": err.Error()}) + return + } + + c.Redirect(http.StatusSeeOther, "/maintenance/config") +} + +// TriggerMaintenanceScan triggers a maintenance scan +func (as *AdminServer) TriggerMaintenanceScan(c *gin.Context) { + err := as.triggerMaintenanceScan() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"success": true, "message": "Maintenance scan triggered"}) +} + +// GetMaintenanceTasks returns all maintenance tasks +func (as *AdminServer) GetMaintenanceTasks(c *gin.Context) { + tasks, err := as.getMaintenanceTasks() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, tasks) +} + +// GetMaintenanceTask returns a specific maintenance task +func (as *AdminServer) GetMaintenanceTask(c *gin.Context) { + taskID := c.Param("id") + task, err := as.getMaintenanceTask(taskID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Task not found"}) + return + } + + c.JSON(http.StatusOK, task) +} + +// CancelMaintenanceTask cancels a pending maintenance task +func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) { + taskID := c.Param("id") + err := as.cancelMaintenanceTask(taskID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"}) +} + +// GetMaintenanceWorkersAPI returns all maintenance workers +func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) { + workers, err := as.getMaintenanceWorkers() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, workers) +} + +// GetMaintenanceWorker returns a specific maintenance worker +func (as *AdminServer) GetMaintenanceWorker(c *gin.Context) { + workerID := c.Param("id") + worker, err := as.getMaintenanceWorkerDetails(workerID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Worker not found"}) + return + } + + c.JSON(http.StatusOK, worker) +} + +// GetMaintenanceStats returns maintenance statistics +func (as *AdminServer) GetMaintenanceStats(c *gin.Context) { + stats, err := as.getMaintenanceStats() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, stats) +} + +// GetMaintenanceConfigAPI returns maintenance configuration +func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) { + config, err := as.getMaintenanceConfig() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, config) +} + +// UpdateMaintenanceConfigAPI updates maintenance configuration via API +func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) { + var config MaintenanceConfig + if err := c.ShouldBindJSON(&config); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + err := as.updateMaintenanceConfig(&config) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"success": true, "message": "Configuration updated"}) +} + +// GetMaintenanceConfigData returns maintenance configuration data (public wrapper) +func (as *AdminServer) GetMaintenanceConfigData() (*maintenance.MaintenanceConfigData, error) { + return as.getMaintenanceConfig() +} + +// UpdateMaintenanceConfigData updates maintenance configuration (public wrapper) +func (as *AdminServer) UpdateMaintenanceConfigData(config *maintenance.MaintenanceConfig) error { + return as.updateMaintenanceConfig(config) +} + +// Helper methods for maintenance operations + +// getMaintenanceQueueData returns data for the maintenance queue UI +func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueData, error) { + tasks, err := as.getMaintenanceTasks() + if err != nil { + return nil, err + } + + workers, err := as.getMaintenanceWorkers() + if err != nil { + return nil, err + } + + stats, err := as.getMaintenanceQueueStats() + if err != nil { + return nil, err + } + + return &maintenance.MaintenanceQueueData{ + Tasks: tasks, + Workers: workers, + Stats: stats, + LastUpdated: time.Now(), + }, nil +} + +// getMaintenanceQueueStats returns statistics for the maintenance queue +func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) { + // This would integrate with the maintenance queue to get real statistics + // For now, return mock data + return &maintenance.QueueStats{ + PendingTasks: 5, + RunningTasks: 2, + CompletedToday: 15, + FailedToday: 1, + TotalTasks: 23, + }, nil +} + +// getMaintenanceTasks returns all maintenance tasks +func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) { + if as.maintenanceManager == nil { + return []*MaintenanceTask{}, nil + } + return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil +} + +// getMaintenanceTask returns a specific maintenance task +func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) { + if as.maintenanceManager == nil { + return nil, fmt.Errorf("maintenance manager not initialized") + } + + // Search for the task across all statuses since we don't know which status it has + statuses := []MaintenanceTaskStatus{ + TaskStatusPending, + TaskStatusAssigned, + TaskStatusInProgress, + TaskStatusCompleted, + TaskStatusFailed, + TaskStatusCancelled, + } + + for _, status := range statuses { + tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status + for _, task := range tasks { + if task.ID == taskID { + return task, nil + } + } + } + + return nil, fmt.Errorf("task %s not found", taskID) +} + +// cancelMaintenanceTask cancels a pending maintenance task +func (as *AdminServer) cancelMaintenanceTask(taskID string) error { + if as.maintenanceManager == nil { + return fmt.Errorf("maintenance manager not initialized") + } + + return as.maintenanceManager.CancelTask(taskID) +} + +// getMaintenanceWorkers returns all maintenance workers +func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) { + if as.maintenanceManager == nil { + return []*MaintenanceWorker{}, nil + } + return as.maintenanceManager.GetWorkers(), nil +} + +// getMaintenanceWorkerDetails returns detailed information about a worker +func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDetailsData, error) { + if as.maintenanceManager == nil { + return nil, fmt.Errorf("maintenance manager not initialized") + } + + workers := as.maintenanceManager.GetWorkers() + var targetWorker *MaintenanceWorker + for _, worker := range workers { + if worker.ID == workerID { + targetWorker = worker + break + } + } + + if targetWorker == nil { + return nil, fmt.Errorf("worker %s not found", workerID) + } + + // Get current tasks for this worker + currentTasks := as.maintenanceManager.GetTasks(TaskStatusInProgress, "", 0) + var workerCurrentTasks []*MaintenanceTask + for _, task := range currentTasks { + if task.WorkerID == workerID { + workerCurrentTasks = append(workerCurrentTasks, task) + } + } + + // Get recent tasks for this worker + recentTasks := as.maintenanceManager.GetTasks(TaskStatusCompleted, "", 10) + var workerRecentTasks []*MaintenanceTask + for _, task := range recentTasks { + if task.WorkerID == workerID { + workerRecentTasks = append(workerRecentTasks, task) + } + } + + // Calculate performance metrics + var totalDuration time.Duration + var completedTasks, failedTasks int + for _, task := range workerRecentTasks { + if task.Status == TaskStatusCompleted { + completedTasks++ + if task.StartedAt != nil && task.CompletedAt != nil { + totalDuration += task.CompletedAt.Sub(*task.StartedAt) + } + } else if task.Status == TaskStatusFailed { + failedTasks++ + } + } + + var averageTaskTime time.Duration + var successRate float64 + if completedTasks+failedTasks > 0 { + if completedTasks > 0 { + averageTaskTime = totalDuration / time.Duration(completedTasks) + } + successRate = float64(completedTasks) / float64(completedTasks+failedTasks) * 100 + } + + return &WorkerDetailsData{ + Worker: targetWorker, + CurrentTasks: workerCurrentTasks, + RecentTasks: workerRecentTasks, + Performance: &WorkerPerformance{ + TasksCompleted: completedTasks, + TasksFailed: failedTasks, + AverageTaskTime: averageTaskTime, + Uptime: time.Since(targetWorker.LastHeartbeat), // This should be tracked properly + SuccessRate: successRate, + }, + LastUpdated: time.Now(), + }, nil +} + +// getMaintenanceStats returns maintenance statistics +func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) { + if as.maintenanceManager == nil { + return &MaintenanceStats{ + TotalTasks: 0, + TasksByStatus: make(map[MaintenanceTaskStatus]int), + TasksByType: make(map[MaintenanceTaskType]int), + ActiveWorkers: 0, + }, nil + } + return as.maintenanceManager.GetStats(), nil +} + +// getMaintenanceConfig returns maintenance configuration +func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigData, error) { + // Load configuration from persistent storage + config, err := as.configPersistence.LoadMaintenanceConfig() + if err != nil { + glog.Errorf("Failed to load maintenance configuration: %v", err) + // Fallback to default configuration + config = DefaultMaintenanceConfig() + } + + // Get system stats from maintenance manager if available + var systemStats *MaintenanceStats + if as.maintenanceManager != nil { + systemStats = as.maintenanceManager.GetStats() + } else { + // Fallback stats + systemStats = &MaintenanceStats{ + TotalTasks: 0, + TasksByStatus: map[MaintenanceTaskStatus]int{ + TaskStatusPending: 0, + TaskStatusInProgress: 0, + TaskStatusCompleted: 0, + TaskStatusFailed: 0, + }, + TasksByType: make(map[MaintenanceTaskType]int), + ActiveWorkers: 0, + CompletedToday: 0, + FailedToday: 0, + AverageTaskTime: 0, + LastScanTime: time.Now().Add(-time.Hour), + NextScanTime: time.Now().Add(time.Duration(config.ScanIntervalSeconds) * time.Second), + } + } + + return &MaintenanceConfigData{ + Config: config, + IsEnabled: config.Enabled, + LastScanTime: systemStats.LastScanTime, + NextScanTime: systemStats.NextScanTime, + SystemStats: systemStats, + MenuItems: maintenance.BuildMaintenanceMenuItems(), + }, nil +} + +// updateMaintenanceConfig updates maintenance configuration +func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error { + // Save configuration to persistent storage + if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil { + return fmt.Errorf("failed to save maintenance configuration: %v", err) + } + + // Update maintenance manager if available + if as.maintenanceManager != nil { + if err := as.maintenanceManager.UpdateConfig(config); err != nil { + glog.Errorf("Failed to update maintenance manager config: %v", err) + // Don't return error here, just log it + } + } + + glog.V(1).Infof("Updated maintenance configuration (enabled: %v, scan interval: %ds)", + config.Enabled, config.ScanIntervalSeconds) + return nil +} + +// triggerMaintenanceScan triggers a maintenance scan +func (as *AdminServer) triggerMaintenanceScan() error { + if as.maintenanceManager == nil { + return fmt.Errorf("maintenance manager not initialized") + } + + return as.maintenanceManager.TriggerScan() +} + +// GetConfigInfo returns information about the admin configuration +func (as *AdminServer) GetConfigInfo(c *gin.Context) { + configInfo := as.configPersistence.GetConfigInfo() + + // Add additional admin server info + configInfo["master_address"] = as.masterAddress + configInfo["cache_expiration"] = as.cacheExpiration.String() + configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String() + + // Add maintenance system info + if as.maintenanceManager != nil { + configInfo["maintenance_enabled"] = true + configInfo["maintenance_running"] = as.maintenanceManager.IsRunning() + } else { + configInfo["maintenance_enabled"] = false + configInfo["maintenance_running"] = false + } + + c.JSON(http.StatusOK, gin.H{ + "config_info": configInfo, + "title": "Configuration Information", + }) +} + +// GetMaintenanceWorkersData returns workers data for the maintenance workers page +func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, error) { + workers, err := as.getMaintenanceWorkers() + if err != nil { + return nil, err + } + + // Create worker details data + workersData := make([]*WorkerDetailsData, 0, len(workers)) + activeWorkers := 0 + busyWorkers := 0 + totalLoad := 0 + + for _, worker := range workers { + details, err := as.getMaintenanceWorkerDetails(worker.ID) + if err != nil { + // Create basic worker details if we can't get full details + details = &WorkerDetailsData{ + Worker: worker, + CurrentTasks: []*MaintenanceTask{}, + RecentTasks: []*MaintenanceTask{}, + Performance: &WorkerPerformance{ + TasksCompleted: 0, + TasksFailed: 0, + AverageTaskTime: 0, + Uptime: 0, + SuccessRate: 0, + }, + LastUpdated: time.Now(), + } + } + workersData = append(workersData, details) + + if worker.Status == "active" { + activeWorkers++ + } else if worker.Status == "busy" { + busyWorkers++ + } + totalLoad += worker.CurrentLoad + } + + return &MaintenanceWorkersData{ + Workers: workersData, + ActiveWorkers: activeWorkers, + BusyWorkers: busyWorkers, + TotalLoad: totalLoad, + LastUpdated: time.Now(), + }, nil +} + +// StartWorkerGrpcServer starts the worker gRPC server +func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error { + if s.workerGrpcServer != nil { + return fmt.Errorf("worker gRPC server is already running") + } + + // Calculate gRPC port (HTTP port + 10000) + grpcPort := httpPort + 10000 + + s.workerGrpcServer = NewWorkerGrpcServer(s) + return s.workerGrpcServer.StartWithTLS(grpcPort) +} + +// StopWorkerGrpcServer stops the worker gRPC server +func (s *AdminServer) StopWorkerGrpcServer() error { + if s.workerGrpcServer != nil { + err := s.workerGrpcServer.Stop() + s.workerGrpcServer = nil + return err + } + return nil +} + +// GetWorkerGrpcServer returns the worker gRPC server +func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer { + return s.workerGrpcServer +} + +// Maintenance system integration methods + +// InitMaintenanceManager initializes the maintenance manager +func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) { + s.maintenanceManager = maintenance.NewMaintenanceManager(s, config) + glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled) +} + +// GetMaintenanceManager returns the maintenance manager +func (s *AdminServer) GetMaintenanceManager() *maintenance.MaintenanceManager { + return s.maintenanceManager +} + +// StartMaintenanceManager starts the maintenance manager +func (s *AdminServer) StartMaintenanceManager() error { + if s.maintenanceManager == nil { + return fmt.Errorf("maintenance manager not initialized") + } + return s.maintenanceManager.Start() +} + +// StopMaintenanceManager stops the maintenance manager +func (s *AdminServer) StopMaintenanceManager() { + if s.maintenanceManager != nil { + s.maintenanceManager.Stop() + } +} + +// Shutdown gracefully shuts down the admin server +func (s *AdminServer) Shutdown() { + glog.V(1).Infof("Shutting down admin server...") + + // Stop maintenance manager + s.StopMaintenanceManager() + + // Stop worker gRPC server + if err := s.StopWorkerGrpcServer(); err != nil { + glog.Errorf("Failed to stop worker gRPC server: %v", err) + } + + glog.V(1).Infof("Admin server shutdown complete") +} diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go new file mode 100644 index 000000000..93d9f6a09 --- /dev/null +++ b/weed/admin/dash/config_persistence.go @@ -0,0 +1,270 @@ +package dash + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +const ( + // Configuration file names + MaintenanceConfigFile = "maintenance.json" + AdminConfigFile = "admin.json" + ConfigDirPermissions = 0755 + ConfigFilePermissions = 0644 +) + +// ConfigPersistence handles saving and loading configuration files +type ConfigPersistence struct { + dataDir string +} + +// NewConfigPersistence creates a new configuration persistence manager +func NewConfigPersistence(dataDir string) *ConfigPersistence { + return &ConfigPersistence{ + dataDir: dataDir, + } +} + +// SaveMaintenanceConfig saves maintenance configuration to JSON file +func (cp *ConfigPersistence) SaveMaintenanceConfig(config *MaintenanceConfig) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save configuration") + } + + configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile) + + // Create directory if it doesn't exist + if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create config directory: %v", err) + } + + // Marshal configuration to JSON + configData, err := json.MarshalIndent(config, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal maintenance config: %v", err) + } + + // Write to file + if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write maintenance config file: %v", err) + } + + glog.V(1).Infof("Saved maintenance configuration to %s", configPath) + return nil +} + +// LoadMaintenanceConfig loads maintenance configuration from JSON file +func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) { + if cp.dataDir == "" { + glog.V(1).Infof("No data directory specified, using default maintenance configuration") + return DefaultMaintenanceConfig(), nil + } + + configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + glog.V(1).Infof("Maintenance config file does not exist, using defaults: %s", configPath) + return DefaultMaintenanceConfig(), nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read maintenance config file: %v", err) + } + + // Unmarshal JSON + var config MaintenanceConfig + if err := json.Unmarshal(configData, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal maintenance config: %v", err) + } + + glog.V(1).Infof("Loaded maintenance configuration from %s", configPath) + return &config, nil +} + +// SaveAdminConfig saves general admin configuration to JSON file +func (cp *ConfigPersistence) SaveAdminConfig(config map[string]interface{}) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save configuration") + } + + configPath := filepath.Join(cp.dataDir, AdminConfigFile) + + // Create directory if it doesn't exist + if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create config directory: %v", err) + } + + // Marshal configuration to JSON + configData, err := json.MarshalIndent(config, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal admin config: %v", err) + } + + // Write to file + if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write admin config file: %v", err) + } + + glog.V(1).Infof("Saved admin configuration to %s", configPath) + return nil +} + +// LoadAdminConfig loads general admin configuration from JSON file +func (cp *ConfigPersistence) LoadAdminConfig() (map[string]interface{}, error) { + if cp.dataDir == "" { + glog.V(1).Infof("No data directory specified, using default admin configuration") + return make(map[string]interface{}), nil + } + + configPath := filepath.Join(cp.dataDir, AdminConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + glog.V(1).Infof("Admin config file does not exist, using defaults: %s", configPath) + return make(map[string]interface{}), nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read admin config file: %v", err) + } + + // Unmarshal JSON + var config map[string]interface{} + if err := json.Unmarshal(configData, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal admin config: %v", err) + } + + glog.V(1).Infof("Loaded admin configuration from %s", configPath) + return config, nil +} + +// GetConfigPath returns the path to a configuration file +func (cp *ConfigPersistence) GetConfigPath(filename string) string { + if cp.dataDir == "" { + return "" + } + return filepath.Join(cp.dataDir, filename) +} + +// ListConfigFiles returns all configuration files in the data directory +func (cp *ConfigPersistence) ListConfigFiles() ([]string, error) { + if cp.dataDir == "" { + return nil, fmt.Errorf("no data directory specified") + } + + files, err := os.ReadDir(cp.dataDir) + if err != nil { + return nil, fmt.Errorf("failed to read config directory: %v", err) + } + + var configFiles []string + for _, file := range files { + if !file.IsDir() && filepath.Ext(file.Name()) == ".json" { + configFiles = append(configFiles, file.Name()) + } + } + + return configFiles, nil +} + +// BackupConfig creates a backup of a configuration file +func (cp *ConfigPersistence) BackupConfig(filename string) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified") + } + + configPath := filepath.Join(cp.dataDir, filename) + if _, err := os.Stat(configPath); os.IsNotExist(err) { + return fmt.Errorf("config file does not exist: %s", filename) + } + + // Create backup filename with timestamp + timestamp := time.Now().Format("2006-01-02_15-04-05") + backupName := fmt.Sprintf("%s.backup_%s", filename, timestamp) + backupPath := filepath.Join(cp.dataDir, backupName) + + // Copy file + configData, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("failed to read config file: %v", err) + } + + if err := os.WriteFile(backupPath, configData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to create backup: %v", err) + } + + glog.V(1).Infof("Created backup of %s as %s", filename, backupName) + return nil +} + +// RestoreConfig restores a configuration file from a backup +func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified") + } + + backupPath := filepath.Join(cp.dataDir, backupName) + if _, err := os.Stat(backupPath); os.IsNotExist(err) { + return fmt.Errorf("backup file does not exist: %s", backupName) + } + + // Read backup file + backupData, err := os.ReadFile(backupPath) + if err != nil { + return fmt.Errorf("failed to read backup file: %v", err) + } + + // Write to config file + configPath := filepath.Join(cp.dataDir, filename) + if err := os.WriteFile(configPath, backupData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to restore config: %v", err) + } + + glog.V(1).Infof("Restored %s from backup %s", filename, backupName) + return nil +} + +// GetDataDir returns the data directory path +func (cp *ConfigPersistence) GetDataDir() string { + return cp.dataDir +} + +// IsConfigured returns true if a data directory is configured +func (cp *ConfigPersistence) IsConfigured() bool { + return cp.dataDir != "" +} + +// GetConfigInfo returns information about the configuration storage +func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { + info := map[string]interface{}{ + "data_dir_configured": cp.IsConfigured(), + "data_dir": cp.dataDir, + } + + if cp.IsConfigured() { + // Check if data directory exists + if _, err := os.Stat(cp.dataDir); err == nil { + info["data_dir_exists"] = true + + // List config files + configFiles, err := cp.ListConfigFiles() + if err == nil { + info["config_files"] = configFiles + } + } else { + info["data_dir_exists"] = false + } + } + + return info +} diff --git a/weed/admin/dash/types.go b/weed/admin/dash/types.go index 8c0be1aeb..07157d9dc 100644 --- a/weed/admin/dash/types.go +++ b/weed/admin/dash/types.go @@ -3,6 +3,7 @@ package dash import ( "time" + "github.com/seaweedfs/seaweedfs/weed/admin/maintenance" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -197,3 +198,51 @@ type ClusterVolumeServersData struct { TotalCapacity int64 `json:"total_capacity"` LastUpdated time.Time `json:"last_updated"` } + +// Type aliases for maintenance package types to support existing code +type MaintenanceTask = maintenance.MaintenanceTask +type MaintenanceTaskType = maintenance.MaintenanceTaskType +type MaintenanceTaskStatus = maintenance.MaintenanceTaskStatus +type MaintenanceTaskPriority = maintenance.MaintenanceTaskPriority +type MaintenanceWorker = maintenance.MaintenanceWorker +type MaintenanceConfig = maintenance.MaintenanceConfig +type MaintenanceStats = maintenance.MaintenanceStats +type MaintenanceConfigData = maintenance.MaintenanceConfigData +type MaintenanceQueueData = maintenance.MaintenanceQueueData +type QueueStats = maintenance.QueueStats +type WorkerDetailsData = maintenance.WorkerDetailsData +type WorkerPerformance = maintenance.WorkerPerformance + +// GetTaskIcon returns the icon CSS class for a task type from its UI provider +func GetTaskIcon(taskType MaintenanceTaskType) string { + return maintenance.GetTaskIcon(taskType) +} + +// Status constants (these are still static) +const ( + TaskStatusPending = maintenance.TaskStatusPending + TaskStatusAssigned = maintenance.TaskStatusAssigned + TaskStatusInProgress = maintenance.TaskStatusInProgress + TaskStatusCompleted = maintenance.TaskStatusCompleted + TaskStatusFailed = maintenance.TaskStatusFailed + TaskStatusCancelled = maintenance.TaskStatusCancelled + + PriorityLow = maintenance.PriorityLow + PriorityNormal = maintenance.PriorityNormal + PriorityHigh = maintenance.PriorityHigh + PriorityCritical = maintenance.PriorityCritical +) + +// Helper functions from maintenance package +var DefaultMaintenanceConfig = maintenance.DefaultMaintenanceConfig + +// MaintenanceWorkersData represents the data for the maintenance workers page +type MaintenanceWorkersData struct { + Workers []*WorkerDetailsData `json:"workers"` + ActiveWorkers int `json:"active_workers"` + BusyWorkers int `json:"busy_workers"` + TotalLoad int `json:"total_load"` + LastUpdated time.Time `json:"last_updated"` +} + +// Maintenance system types are now in weed/admin/maintenance package diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go new file mode 100644 index 000000000..c824cc388 --- /dev/null +++ b/weed/admin/dash/worker_grpc_server.go @@ -0,0 +1,461 @@ +package dash + +import ( + "context" + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/peer" +) + +// WorkerGrpcServer implements the WorkerService gRPC interface +type WorkerGrpcServer struct { + worker_pb.UnimplementedWorkerServiceServer + adminServer *AdminServer + + // Worker connection management + connections map[string]*WorkerConnection + connMutex sync.RWMutex + + // gRPC server + grpcServer *grpc.Server + listener net.Listener + running bool + stopChan chan struct{} +} + +// WorkerConnection represents an active worker connection +type WorkerConnection struct { + workerID string + stream worker_pb.WorkerService_WorkerStreamServer + lastSeen time.Time + capabilities []MaintenanceTaskType + address string + maxConcurrent int32 + outgoing chan *worker_pb.AdminMessage + ctx context.Context + cancel context.CancelFunc +} + +// NewWorkerGrpcServer creates a new gRPC server for worker connections +func NewWorkerGrpcServer(adminServer *AdminServer) *WorkerGrpcServer { + return &WorkerGrpcServer{ + adminServer: adminServer, + connections: make(map[string]*WorkerConnection), + stopChan: make(chan struct{}), + } +} + +// StartWithTLS starts the gRPC server on the specified port with optional TLS +func (s *WorkerGrpcServer) StartWithTLS(port int) error { + if s.running { + return fmt.Errorf("worker gRPC server is already running") + } + + // Create listener + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return fmt.Errorf("failed to listen on port %d: %v", port, err) + } + + // Create gRPC server with optional TLS + grpcServer := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.admin")) + + worker_pb.RegisterWorkerServiceServer(grpcServer, s) + + s.grpcServer = grpcServer + s.listener = listener + s.running = true + + // Start cleanup routine + go s.cleanupRoutine() + + // Start serving in a goroutine + go func() { + if err := s.grpcServer.Serve(listener); err != nil { + if s.running { + glog.Errorf("Worker gRPC server error: %v", err) + } + } + }() + + return nil +} + +// Stop stops the gRPC server +func (s *WorkerGrpcServer) Stop() error { + if !s.running { + return nil + } + + s.running = false + close(s.stopChan) + + // Close all worker connections + s.connMutex.Lock() + for _, conn := range s.connections { + conn.cancel() + close(conn.outgoing) + } + s.connections = make(map[string]*WorkerConnection) + s.connMutex.Unlock() + + // Stop gRPC server + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + } + + // Close listener + if s.listener != nil { + s.listener.Close() + } + + glog.Infof("Worker gRPC server stopped") + return nil +} + +// WorkerStream handles bidirectional communication with workers +func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStreamServer) error { + ctx := stream.Context() + + // get client address + address := findClientAddress(ctx) + + // Wait for initial registration message + msg, err := stream.Recv() + if err != nil { + return fmt.Errorf("failed to receive registration message: %v", err) + } + + registration := msg.GetRegistration() + if registration == nil { + return fmt.Errorf("first message must be registration") + } + registration.Address = address + + workerID := registration.WorkerId + if workerID == "" { + return fmt.Errorf("worker ID cannot be empty") + } + + glog.Infof("Worker %s connecting from %s", workerID, registration.Address) + + // Create worker connection + connCtx, connCancel := context.WithCancel(ctx) + conn := &WorkerConnection{ + workerID: workerID, + stream: stream, + lastSeen: time.Now(), + address: registration.Address, + maxConcurrent: registration.MaxConcurrent, + outgoing: make(chan *worker_pb.AdminMessage, 100), + ctx: connCtx, + cancel: connCancel, + } + + // Convert capabilities + capabilities := make([]MaintenanceTaskType, len(registration.Capabilities)) + for i, cap := range registration.Capabilities { + capabilities[i] = MaintenanceTaskType(cap) + } + conn.capabilities = capabilities + + // Register connection + s.connMutex.Lock() + s.connections[workerID] = conn + s.connMutex.Unlock() + + // Register worker with maintenance manager + s.registerWorkerWithManager(conn) + + // Send registration response + regResponse := &worker_pb.AdminMessage{ + Timestamp: time.Now().Unix(), + Message: &worker_pb.AdminMessage_RegistrationResponse{ + RegistrationResponse: &worker_pb.RegistrationResponse{ + Success: true, + Message: "Worker registered successfully", + }, + }, + } + + select { + case conn.outgoing <- regResponse: + case <-time.After(5 * time.Second): + glog.Errorf("Failed to send registration response to worker %s", workerID) + } + + // Start outgoing message handler + go s.handleOutgoingMessages(conn) + + // Handle incoming messages + for { + select { + case <-ctx.Done(): + glog.Infof("Worker %s connection closed: %v", workerID, ctx.Err()) + s.unregisterWorker(workerID) + return nil + case <-connCtx.Done(): + glog.Infof("Worker %s connection cancelled", workerID) + s.unregisterWorker(workerID) + return nil + default: + } + + msg, err := stream.Recv() + if err != nil { + if err == io.EOF { + glog.Infof("Worker %s disconnected", workerID) + } else { + glog.Errorf("Error receiving from worker %s: %v", workerID, err) + } + s.unregisterWorker(workerID) + return err + } + + conn.lastSeen = time.Now() + s.handleWorkerMessage(conn, msg) + } +} + +// handleOutgoingMessages sends messages to worker +func (s *WorkerGrpcServer) handleOutgoingMessages(conn *WorkerConnection) { + for { + select { + case <-conn.ctx.Done(): + return + case msg, ok := <-conn.outgoing: + if !ok { + return + } + + if err := conn.stream.Send(msg); err != nil { + glog.Errorf("Failed to send message to worker %s: %v", conn.workerID, err) + conn.cancel() + return + } + } + } +} + +// handleWorkerMessage processes incoming messages from workers +func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *worker_pb.WorkerMessage) { + workerID := conn.workerID + + switch m := msg.Message.(type) { + case *worker_pb.WorkerMessage_Heartbeat: + s.handleHeartbeat(conn, m.Heartbeat) + + case *worker_pb.WorkerMessage_TaskRequest: + s.handleTaskRequest(conn, m.TaskRequest) + + case *worker_pb.WorkerMessage_TaskUpdate: + s.handleTaskUpdate(conn, m.TaskUpdate) + + case *worker_pb.WorkerMessage_TaskComplete: + s.handleTaskCompletion(conn, m.TaskComplete) + + case *worker_pb.WorkerMessage_Shutdown: + glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason) + s.unregisterWorker(workerID) + + default: + glog.Warningf("Unknown message type from worker %s", workerID) + } +} + +// registerWorkerWithManager registers the worker with the maintenance manager +func (s *WorkerGrpcServer) registerWorkerWithManager(conn *WorkerConnection) { + if s.adminServer.maintenanceManager == nil { + return + } + + worker := &MaintenanceWorker{ + ID: conn.workerID, + Address: conn.address, + LastHeartbeat: time.Now(), + Status: "active", + Capabilities: conn.capabilities, + MaxConcurrent: int(conn.maxConcurrent), + CurrentLoad: 0, + } + + s.adminServer.maintenanceManager.RegisterWorker(worker) + glog.V(1).Infof("Registered worker %s with maintenance manager", conn.workerID) +} + +// handleHeartbeat processes heartbeat messages +func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *worker_pb.WorkerHeartbeat) { + if s.adminServer.maintenanceManager != nil { + s.adminServer.maintenanceManager.UpdateWorkerHeartbeat(conn.workerID) + } + + // Send heartbeat response + response := &worker_pb.AdminMessage{ + Timestamp: time.Now().Unix(), + Message: &worker_pb.AdminMessage_HeartbeatResponse{ + HeartbeatResponse: &worker_pb.HeartbeatResponse{ + Success: true, + Message: "Heartbeat acknowledged", + }, + }, + } + + select { + case conn.outgoing <- response: + case <-time.After(time.Second): + glog.Warningf("Failed to send heartbeat response to worker %s", conn.workerID) + } +} + +// handleTaskRequest processes task requests from workers +func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) { + if s.adminServer.maintenanceManager == nil { + return + } + + // Get next task from maintenance manager + task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities) + + if task != nil { + // Send task assignment + assignment := &worker_pb.AdminMessage{ + Timestamp: time.Now().Unix(), + Message: &worker_pb.AdminMessage_TaskAssignment{ + TaskAssignment: &worker_pb.TaskAssignment{ + TaskId: task.ID, + TaskType: string(task.Type), + Params: &worker_pb.TaskParams{ + VolumeId: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + Parameters: convertTaskParameters(task.Parameters), + }, + Priority: int32(task.Priority), + CreatedTime: time.Now().Unix(), + }, + }, + } + + select { + case conn.outgoing <- assignment: + glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID) + case <-time.After(time.Second): + glog.Warningf("Failed to send task assignment to worker %s", conn.workerID) + } + } +} + +// handleTaskUpdate processes task progress updates +func (s *WorkerGrpcServer) handleTaskUpdate(conn *WorkerConnection, update *worker_pb.TaskUpdate) { + if s.adminServer.maintenanceManager != nil { + s.adminServer.maintenanceManager.UpdateTaskProgress(update.TaskId, float64(update.Progress)) + glog.V(3).Infof("Updated task %s progress: %.1f%%", update.TaskId, update.Progress) + } +} + +// handleTaskCompletion processes task completion notifications +func (s *WorkerGrpcServer) handleTaskCompletion(conn *WorkerConnection, completion *worker_pb.TaskComplete) { + if s.adminServer.maintenanceManager != nil { + errorMsg := "" + if !completion.Success { + errorMsg = completion.ErrorMessage + } + s.adminServer.maintenanceManager.CompleteTask(completion.TaskId, errorMsg) + + if completion.Success { + glog.V(1).Infof("Worker %s completed task %s successfully", conn.workerID, completion.TaskId) + } else { + glog.Errorf("Worker %s failed task %s: %s", conn.workerID, completion.TaskId, completion.ErrorMessage) + } + } +} + +// unregisterWorker removes a worker connection +func (s *WorkerGrpcServer) unregisterWorker(workerID string) { + s.connMutex.Lock() + if conn, exists := s.connections[workerID]; exists { + conn.cancel() + close(conn.outgoing) + delete(s.connections, workerID) + } + s.connMutex.Unlock() + + glog.V(1).Infof("Unregistered worker %s", workerID) +} + +// cleanupRoutine periodically cleans up stale connections +func (s *WorkerGrpcServer) cleanupRoutine() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-s.stopChan: + return + case <-ticker.C: + s.cleanupStaleConnections() + } + } +} + +// cleanupStaleConnections removes connections that haven't been seen recently +func (s *WorkerGrpcServer) cleanupStaleConnections() { + cutoff := time.Now().Add(-2 * time.Minute) + + s.connMutex.Lock() + defer s.connMutex.Unlock() + + for workerID, conn := range s.connections { + if conn.lastSeen.Before(cutoff) { + glog.Warningf("Cleaning up stale worker connection: %s", workerID) + conn.cancel() + close(conn.outgoing) + delete(s.connections, workerID) + } + } +} + +// GetConnectedWorkers returns a list of currently connected workers +func (s *WorkerGrpcServer) GetConnectedWorkers() []string { + s.connMutex.RLock() + defer s.connMutex.RUnlock() + + workers := make([]string, 0, len(s.connections)) + for workerID := range s.connections { + workers = append(workers, workerID) + } + return workers +} + +// convertTaskParameters converts task parameters to protobuf format +func convertTaskParameters(params map[string]interface{}) map[string]string { + result := make(map[string]string) + for key, value := range params { + result[key] = fmt.Sprintf("%v", value) + } + return result +} + +func findClientAddress(ctx context.Context) string { + // fmt.Printf("FromContext %+v\n", ctx) + pr, ok := peer.FromContext(ctx) + if !ok { + glog.Error("failed to get peer from ctx") + return "" + } + if pr.Addr == net.Addr(nil) { + glog.Error("failed to get peer address") + return "" + } + return pr.Addr.String() +} |
