aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-06 13:57:02 -0700
committerGitHub <noreply@github.com>2025-07-06 13:57:02 -0700
commitaa668523047c273dc4065dc0f40852efcdf9e9f0 (patch)
tree87f7f145d699cf1824c8251ae71435462bfd3318 /weed/admin/dash
parent302e62d4805c60f3fdb6620b01e85859d68078ed (diff)
downloadseaweedfs-aa668523047c273dc4065dc0f40852efcdf9e9f0.tar.xz
seaweedfs-aa668523047c273dc4065dc0f40852efcdf9e9f0.zip
Admin UI add maintenance menu (#6944)
* add ui for maintenance * valid config loading. fix workers page. * refactor * grpc between admin and workers * add a long-running bidirectional grpc call between admin and worker * use the grpc call to heartbeat * use the grpc call to communicate * worker can remove the http client * admin uses http port + 10000 as its default grpc port * one task one package * handles connection failures gracefully with exponential backoff * grpc with insecure tls * grpc with optional tls * fix detecting tls * change time config from nano seconds to seconds * add tasks with 3 interfaces * compiles reducing hard coded * remove a couple of tasks * remove hard coded references * reduce hard coded values * remove hard coded values * remove hard coded from templ * refactor maintenance package * fix import cycle * simplify * simplify * auto register * auto register factory * auto register task types * self register types * refactor * simplify * remove one task * register ui * lazy init executor factories * use registered task types * DefaultWorkerConfig remove hard coded task types * remove more hard coded * implement get maintenance task * dynamic task configuration * "System Settings" should only have system level settings * adjust menu for tasks * ensure menu not collapsed * render job configuration well * use templ for ui of task configuration * fix ordering * fix bugs * saving duration in seconds * use value and unit for duration * Delete WORKER_REFACTORING_PLAN.md * Delete maintenance.json * Delete custom_worker_example.go * remove address from workers * remove old code from ec task * remove creating collection button * reconnect with exponential backoff * worker use security.toml * start admin server with tls info from security.toml * fix "weed admin" cli description
Diffstat (limited to 'weed/admin/dash')
-rw-r--r--weed/admin/dash/admin_server.go632
-rw-r--r--weed/admin/dash/config_persistence.go270
-rw-r--r--weed/admin/dash/types.go49
-rw-r--r--weed/admin/dash/worker_grpc_server.go461
4 files changed, 1411 insertions, 1 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go
index 03a44a6da..c8da2bbb7 100644
--- a/weed/admin/dash/admin_server.go
+++ b/weed/admin/dash/admin_server.go
@@ -7,6 +7,8 @@ import (
"net/http"
"time"
+ "github.com/gin-gonic/gin"
+ "github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -22,6 +24,7 @@ import (
type AdminServer struct {
masterAddress string
templateFS http.FileSystem
+ dataDir string
grpcDialOption grpc.DialOption
cacheExpiration time.Duration
lastCacheUpdate time.Time
@@ -34,17 +37,28 @@ type AdminServer struct {
// Credential management
credentialManager *credential.CredentialManager
+
+ // Configuration persistence
+ configPersistence *ConfigPersistence
+
+ // Maintenance system
+ maintenanceManager *maintenance.MaintenanceManager
+
+ // Worker gRPC server
+ workerGrpcServer *WorkerGrpcServer
}
// Type definitions moved to types.go
-func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServer {
+func NewAdminServer(masterAddress string, templateFS http.FileSystem, dataDir string) *AdminServer {
server := &AdminServer{
masterAddress: masterAddress,
templateFS: templateFS,
+ dataDir: dataDir,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
cacheExpiration: 10 * time.Second,
filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
+ configPersistence: NewConfigPersistence(dataDir),
}
// Initialize credential manager with defaults
@@ -82,6 +96,27 @@ func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServ
}
}
+ // Initialize maintenance system with persistent configuration
+ if server.configPersistence.IsConfigured() {
+ maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig()
+ if err != nil {
+ glog.Errorf("Failed to load maintenance configuration: %v", err)
+ maintenanceConfig = maintenance.DefaultMaintenanceConfig()
+ }
+ server.InitMaintenanceManager(maintenanceConfig)
+
+ // Start maintenance manager if enabled
+ if maintenanceConfig.Enabled {
+ go func() {
+ if err := server.StartMaintenanceManager(); err != nil {
+ glog.Errorf("Failed to start maintenance manager: %v", err)
+ }
+ }()
+ }
+ } else {
+ glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode")
+ }
+
return server
}
@@ -568,3 +603,598 @@ func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) {
// GetVolumeDetails method moved to volume_management.go
// VacuumVolume method moved to volume_management.go
+
+// ShowMaintenanceQueue displays the maintenance queue page
+func (as *AdminServer) ShowMaintenanceQueue(c *gin.Context) {
+ data, err := as.getMaintenanceQueueData()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ // This should not render HTML template, it should use the component approach
+ c.JSON(http.StatusOK, data)
+}
+
+// ShowMaintenanceWorkers displays the maintenance workers page
+func (as *AdminServer) ShowMaintenanceWorkers(c *gin.Context) {
+ workers, err := as.getMaintenanceWorkers()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ // Create worker details data
+ workersData := make([]*WorkerDetailsData, 0, len(workers))
+ for _, worker := range workers {
+ details, err := as.getMaintenanceWorkerDetails(worker.ID)
+ if err != nil {
+ // Create basic worker details if we can't get full details
+ details = &WorkerDetailsData{
+ Worker: worker,
+ CurrentTasks: []*MaintenanceTask{},
+ RecentTasks: []*MaintenanceTask{},
+ Performance: &WorkerPerformance{
+ TasksCompleted: 0,
+ TasksFailed: 0,
+ AverageTaskTime: 0,
+ Uptime: 0,
+ SuccessRate: 0,
+ },
+ LastUpdated: time.Now(),
+ }
+ }
+ workersData = append(workersData, details)
+ }
+
+ c.JSON(http.StatusOK, gin.H{
+ "workers": workersData,
+ "title": "Maintenance Workers",
+ })
+}
+
+// ShowMaintenanceConfig displays the maintenance configuration page
+func (as *AdminServer) ShowMaintenanceConfig(c *gin.Context) {
+ config, err := as.getMaintenanceConfig()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ // This should not render HTML template, it should use the component approach
+ c.JSON(http.StatusOK, config)
+}
+
+// UpdateMaintenanceConfig updates maintenance configuration from form
+func (as *AdminServer) UpdateMaintenanceConfig(c *gin.Context) {
+ var config MaintenanceConfig
+ if err := c.ShouldBind(&config); err != nil {
+ c.HTML(http.StatusBadRequest, "error.html", gin.H{"error": err.Error()})
+ return
+ }
+
+ err := as.updateMaintenanceConfig(&config)
+ if err != nil {
+ c.HTML(http.StatusInternalServerError, "error.html", gin.H{"error": err.Error()})
+ return
+ }
+
+ c.Redirect(http.StatusSeeOther, "/maintenance/config")
+}
+
+// TriggerMaintenanceScan triggers a maintenance scan
+func (as *AdminServer) TriggerMaintenanceScan(c *gin.Context) {
+ err := as.triggerMaintenanceScan()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"success": true, "message": "Maintenance scan triggered"})
+}
+
+// GetMaintenanceTasks returns all maintenance tasks
+func (as *AdminServer) GetMaintenanceTasks(c *gin.Context) {
+ tasks, err := as.getMaintenanceTasks()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, tasks)
+}
+
+// GetMaintenanceTask returns a specific maintenance task
+func (as *AdminServer) GetMaintenanceTask(c *gin.Context) {
+ taskID := c.Param("id")
+ task, err := as.getMaintenanceTask(taskID)
+ if err != nil {
+ c.JSON(http.StatusNotFound, gin.H{"error": "Task not found"})
+ return
+ }
+
+ c.JSON(http.StatusOK, task)
+}
+
+// CancelMaintenanceTask cancels a pending maintenance task
+func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
+ taskID := c.Param("id")
+ err := as.cancelMaintenanceTask(taskID)
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"success": false, "error": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"})
+}
+
+// GetMaintenanceWorkersAPI returns all maintenance workers
+func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) {
+ workers, err := as.getMaintenanceWorkers()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, workers)
+}
+
+// GetMaintenanceWorker returns a specific maintenance worker
+func (as *AdminServer) GetMaintenanceWorker(c *gin.Context) {
+ workerID := c.Param("id")
+ worker, err := as.getMaintenanceWorkerDetails(workerID)
+ if err != nil {
+ c.JSON(http.StatusNotFound, gin.H{"error": "Worker not found"})
+ return
+ }
+
+ c.JSON(http.StatusOK, worker)
+}
+
+// GetMaintenanceStats returns maintenance statistics
+func (as *AdminServer) GetMaintenanceStats(c *gin.Context) {
+ stats, err := as.getMaintenanceStats()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, stats)
+}
+
+// GetMaintenanceConfigAPI returns maintenance configuration
+func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) {
+ config, err := as.getMaintenanceConfig()
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, config)
+}
+
+// UpdateMaintenanceConfigAPI updates maintenance configuration via API
+func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) {
+ var config MaintenanceConfig
+ if err := c.ShouldBindJSON(&config); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+ return
+ }
+
+ err := as.updateMaintenanceConfig(&config)
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
+
+ c.JSON(http.StatusOK, gin.H{"success": true, "message": "Configuration updated"})
+}
+
+// GetMaintenanceConfigData returns maintenance configuration data (public wrapper)
+func (as *AdminServer) GetMaintenanceConfigData() (*maintenance.MaintenanceConfigData, error) {
+ return as.getMaintenanceConfig()
+}
+
+// UpdateMaintenanceConfigData updates maintenance configuration (public wrapper)
+func (as *AdminServer) UpdateMaintenanceConfigData(config *maintenance.MaintenanceConfig) error {
+ return as.updateMaintenanceConfig(config)
+}
+
+// Helper methods for maintenance operations
+
+// getMaintenanceQueueData returns data for the maintenance queue UI
+func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueData, error) {
+ tasks, err := as.getMaintenanceTasks()
+ if err != nil {
+ return nil, err
+ }
+
+ workers, err := as.getMaintenanceWorkers()
+ if err != nil {
+ return nil, err
+ }
+
+ stats, err := as.getMaintenanceQueueStats()
+ if err != nil {
+ return nil, err
+ }
+
+ return &maintenance.MaintenanceQueueData{
+ Tasks: tasks,
+ Workers: workers,
+ Stats: stats,
+ LastUpdated: time.Now(),
+ }, nil
+}
+
+// getMaintenanceQueueStats returns statistics for the maintenance queue
+func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) {
+ // This would integrate with the maintenance queue to get real statistics
+ // For now, return mock data
+ return &maintenance.QueueStats{
+ PendingTasks: 5,
+ RunningTasks: 2,
+ CompletedToday: 15,
+ FailedToday: 1,
+ TotalTasks: 23,
+ }, nil
+}
+
+// getMaintenanceTasks returns all maintenance tasks
+func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) {
+ if as.maintenanceManager == nil {
+ return []*MaintenanceTask{}, nil
+ }
+ return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil
+}
+
+// getMaintenanceTask returns a specific maintenance task
+func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) {
+ if as.maintenanceManager == nil {
+ return nil, fmt.Errorf("maintenance manager not initialized")
+ }
+
+ // Search for the task across all statuses since we don't know which status it has
+ statuses := []MaintenanceTaskStatus{
+ TaskStatusPending,
+ TaskStatusAssigned,
+ TaskStatusInProgress,
+ TaskStatusCompleted,
+ TaskStatusFailed,
+ TaskStatusCancelled,
+ }
+
+ for _, status := range statuses {
+ tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status
+ for _, task := range tasks {
+ if task.ID == taskID {
+ return task, nil
+ }
+ }
+ }
+
+ return nil, fmt.Errorf("task %s not found", taskID)
+}
+
+// cancelMaintenanceTask cancels a pending maintenance task
+func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
+ if as.maintenanceManager == nil {
+ return fmt.Errorf("maintenance manager not initialized")
+ }
+
+ return as.maintenanceManager.CancelTask(taskID)
+}
+
+// getMaintenanceWorkers returns all maintenance workers
+func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
+ if as.maintenanceManager == nil {
+ return []*MaintenanceWorker{}, nil
+ }
+ return as.maintenanceManager.GetWorkers(), nil
+}
+
+// getMaintenanceWorkerDetails returns detailed information about a worker
+func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDetailsData, error) {
+ if as.maintenanceManager == nil {
+ return nil, fmt.Errorf("maintenance manager not initialized")
+ }
+
+ workers := as.maintenanceManager.GetWorkers()
+ var targetWorker *MaintenanceWorker
+ for _, worker := range workers {
+ if worker.ID == workerID {
+ targetWorker = worker
+ break
+ }
+ }
+
+ if targetWorker == nil {
+ return nil, fmt.Errorf("worker %s not found", workerID)
+ }
+
+ // Get current tasks for this worker
+ currentTasks := as.maintenanceManager.GetTasks(TaskStatusInProgress, "", 0)
+ var workerCurrentTasks []*MaintenanceTask
+ for _, task := range currentTasks {
+ if task.WorkerID == workerID {
+ workerCurrentTasks = append(workerCurrentTasks, task)
+ }
+ }
+
+ // Get recent tasks for this worker
+ recentTasks := as.maintenanceManager.GetTasks(TaskStatusCompleted, "", 10)
+ var workerRecentTasks []*MaintenanceTask
+ for _, task := range recentTasks {
+ if task.WorkerID == workerID {
+ workerRecentTasks = append(workerRecentTasks, task)
+ }
+ }
+
+ // Calculate performance metrics
+ var totalDuration time.Duration
+ var completedTasks, failedTasks int
+ for _, task := range workerRecentTasks {
+ if task.Status == TaskStatusCompleted {
+ completedTasks++
+ if task.StartedAt != nil && task.CompletedAt != nil {
+ totalDuration += task.CompletedAt.Sub(*task.StartedAt)
+ }
+ } else if task.Status == TaskStatusFailed {
+ failedTasks++
+ }
+ }
+
+ var averageTaskTime time.Duration
+ var successRate float64
+ if completedTasks+failedTasks > 0 {
+ if completedTasks > 0 {
+ averageTaskTime = totalDuration / time.Duration(completedTasks)
+ }
+ successRate = float64(completedTasks) / float64(completedTasks+failedTasks) * 100
+ }
+
+ return &WorkerDetailsData{
+ Worker: targetWorker,
+ CurrentTasks: workerCurrentTasks,
+ RecentTasks: workerRecentTasks,
+ Performance: &WorkerPerformance{
+ TasksCompleted: completedTasks,
+ TasksFailed: failedTasks,
+ AverageTaskTime: averageTaskTime,
+ Uptime: time.Since(targetWorker.LastHeartbeat), // This should be tracked properly
+ SuccessRate: successRate,
+ },
+ LastUpdated: time.Now(),
+ }, nil
+}
+
+// getMaintenanceStats returns maintenance statistics
+func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) {
+ if as.maintenanceManager == nil {
+ return &MaintenanceStats{
+ TotalTasks: 0,
+ TasksByStatus: make(map[MaintenanceTaskStatus]int),
+ TasksByType: make(map[MaintenanceTaskType]int),
+ ActiveWorkers: 0,
+ }, nil
+ }
+ return as.maintenanceManager.GetStats(), nil
+}
+
+// getMaintenanceConfig returns maintenance configuration
+func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigData, error) {
+ // Load configuration from persistent storage
+ config, err := as.configPersistence.LoadMaintenanceConfig()
+ if err != nil {
+ glog.Errorf("Failed to load maintenance configuration: %v", err)
+ // Fallback to default configuration
+ config = DefaultMaintenanceConfig()
+ }
+
+ // Get system stats from maintenance manager if available
+ var systemStats *MaintenanceStats
+ if as.maintenanceManager != nil {
+ systemStats = as.maintenanceManager.GetStats()
+ } else {
+ // Fallback stats
+ systemStats = &MaintenanceStats{
+ TotalTasks: 0,
+ TasksByStatus: map[MaintenanceTaskStatus]int{
+ TaskStatusPending: 0,
+ TaskStatusInProgress: 0,
+ TaskStatusCompleted: 0,
+ TaskStatusFailed: 0,
+ },
+ TasksByType: make(map[MaintenanceTaskType]int),
+ ActiveWorkers: 0,
+ CompletedToday: 0,
+ FailedToday: 0,
+ AverageTaskTime: 0,
+ LastScanTime: time.Now().Add(-time.Hour),
+ NextScanTime: time.Now().Add(time.Duration(config.ScanIntervalSeconds) * time.Second),
+ }
+ }
+
+ return &MaintenanceConfigData{
+ Config: config,
+ IsEnabled: config.Enabled,
+ LastScanTime: systemStats.LastScanTime,
+ NextScanTime: systemStats.NextScanTime,
+ SystemStats: systemStats,
+ MenuItems: maintenance.BuildMaintenanceMenuItems(),
+ }, nil
+}
+
+// updateMaintenanceConfig updates maintenance configuration
+func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error {
+ // Save configuration to persistent storage
+ if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil {
+ return fmt.Errorf("failed to save maintenance configuration: %v", err)
+ }
+
+ // Update maintenance manager if available
+ if as.maintenanceManager != nil {
+ if err := as.maintenanceManager.UpdateConfig(config); err != nil {
+ glog.Errorf("Failed to update maintenance manager config: %v", err)
+ // Don't return error here, just log it
+ }
+ }
+
+ glog.V(1).Infof("Updated maintenance configuration (enabled: %v, scan interval: %ds)",
+ config.Enabled, config.ScanIntervalSeconds)
+ return nil
+}
+
+// triggerMaintenanceScan triggers a maintenance scan
+func (as *AdminServer) triggerMaintenanceScan() error {
+ if as.maintenanceManager == nil {
+ return fmt.Errorf("maintenance manager not initialized")
+ }
+
+ return as.maintenanceManager.TriggerScan()
+}
+
+// GetConfigInfo returns information about the admin configuration
+func (as *AdminServer) GetConfigInfo(c *gin.Context) {
+ configInfo := as.configPersistence.GetConfigInfo()
+
+ // Add additional admin server info
+ configInfo["master_address"] = as.masterAddress
+ configInfo["cache_expiration"] = as.cacheExpiration.String()
+ configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String()
+
+ // Add maintenance system info
+ if as.maintenanceManager != nil {
+ configInfo["maintenance_enabled"] = true
+ configInfo["maintenance_running"] = as.maintenanceManager.IsRunning()
+ } else {
+ configInfo["maintenance_enabled"] = false
+ configInfo["maintenance_running"] = false
+ }
+
+ c.JSON(http.StatusOK, gin.H{
+ "config_info": configInfo,
+ "title": "Configuration Information",
+ })
+}
+
+// GetMaintenanceWorkersData returns workers data for the maintenance workers page
+func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, error) {
+ workers, err := as.getMaintenanceWorkers()
+ if err != nil {
+ return nil, err
+ }
+
+ // Create worker details data
+ workersData := make([]*WorkerDetailsData, 0, len(workers))
+ activeWorkers := 0
+ busyWorkers := 0
+ totalLoad := 0
+
+ for _, worker := range workers {
+ details, err := as.getMaintenanceWorkerDetails(worker.ID)
+ if err != nil {
+ // Create basic worker details if we can't get full details
+ details = &WorkerDetailsData{
+ Worker: worker,
+ CurrentTasks: []*MaintenanceTask{},
+ RecentTasks: []*MaintenanceTask{},
+ Performance: &WorkerPerformance{
+ TasksCompleted: 0,
+ TasksFailed: 0,
+ AverageTaskTime: 0,
+ Uptime: 0,
+ SuccessRate: 0,
+ },
+ LastUpdated: time.Now(),
+ }
+ }
+ workersData = append(workersData, details)
+
+ if worker.Status == "active" {
+ activeWorkers++
+ } else if worker.Status == "busy" {
+ busyWorkers++
+ }
+ totalLoad += worker.CurrentLoad
+ }
+
+ return &MaintenanceWorkersData{
+ Workers: workersData,
+ ActiveWorkers: activeWorkers,
+ BusyWorkers: busyWorkers,
+ TotalLoad: totalLoad,
+ LastUpdated: time.Now(),
+ }, nil
+}
+
+// StartWorkerGrpcServer starts the worker gRPC server
+func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error {
+ if s.workerGrpcServer != nil {
+ return fmt.Errorf("worker gRPC server is already running")
+ }
+
+ // Calculate gRPC port (HTTP port + 10000)
+ grpcPort := httpPort + 10000
+
+ s.workerGrpcServer = NewWorkerGrpcServer(s)
+ return s.workerGrpcServer.StartWithTLS(grpcPort)
+}
+
+// StopWorkerGrpcServer stops the worker gRPC server
+func (s *AdminServer) StopWorkerGrpcServer() error {
+ if s.workerGrpcServer != nil {
+ err := s.workerGrpcServer.Stop()
+ s.workerGrpcServer = nil
+ return err
+ }
+ return nil
+}
+
+// GetWorkerGrpcServer returns the worker gRPC server
+func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
+ return s.workerGrpcServer
+}
+
+// Maintenance system integration methods
+
+// InitMaintenanceManager initializes the maintenance manager
+func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
+ s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
+ glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
+}
+
+// GetMaintenanceManager returns the maintenance manager
+func (s *AdminServer) GetMaintenanceManager() *maintenance.MaintenanceManager {
+ return s.maintenanceManager
+}
+
+// StartMaintenanceManager starts the maintenance manager
+func (s *AdminServer) StartMaintenanceManager() error {
+ if s.maintenanceManager == nil {
+ return fmt.Errorf("maintenance manager not initialized")
+ }
+ return s.maintenanceManager.Start()
+}
+
+// StopMaintenanceManager stops the maintenance manager
+func (s *AdminServer) StopMaintenanceManager() {
+ if s.maintenanceManager != nil {
+ s.maintenanceManager.Stop()
+ }
+}
+
+// Shutdown gracefully shuts down the admin server
+func (s *AdminServer) Shutdown() {
+ glog.V(1).Infof("Shutting down admin server...")
+
+ // Stop maintenance manager
+ s.StopMaintenanceManager()
+
+ // Stop worker gRPC server
+ if err := s.StopWorkerGrpcServer(); err != nil {
+ glog.Errorf("Failed to stop worker gRPC server: %v", err)
+ }
+
+ glog.V(1).Infof("Admin server shutdown complete")
+}
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go
new file mode 100644
index 000000000..93d9f6a09
--- /dev/null
+++ b/weed/admin/dash/config_persistence.go
@@ -0,0 +1,270 @@
+package dash
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+const (
+ // Configuration file names
+ MaintenanceConfigFile = "maintenance.json"
+ AdminConfigFile = "admin.json"
+ ConfigDirPermissions = 0755
+ ConfigFilePermissions = 0644
+)
+
+// ConfigPersistence handles saving and loading configuration files
+type ConfigPersistence struct {
+ dataDir string
+}
+
+// NewConfigPersistence creates a new configuration persistence manager
+func NewConfigPersistence(dataDir string) *ConfigPersistence {
+ return &ConfigPersistence{
+ dataDir: dataDir,
+ }
+}
+
+// SaveMaintenanceConfig saves maintenance configuration to JSON file
+func (cp *ConfigPersistence) SaveMaintenanceConfig(config *MaintenanceConfig) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot save configuration")
+ }
+
+ configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile)
+
+ // Create directory if it doesn't exist
+ if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil {
+ return fmt.Errorf("failed to create config directory: %v", err)
+ }
+
+ // Marshal configuration to JSON
+ configData, err := json.MarshalIndent(config, "", " ")
+ if err != nil {
+ return fmt.Errorf("failed to marshal maintenance config: %v", err)
+ }
+
+ // Write to file
+ if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write maintenance config file: %v", err)
+ }
+
+ glog.V(1).Infof("Saved maintenance configuration to %s", configPath)
+ return nil
+}
+
+// LoadMaintenanceConfig loads maintenance configuration from JSON file
+func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) {
+ if cp.dataDir == "" {
+ glog.V(1).Infof("No data directory specified, using default maintenance configuration")
+ return DefaultMaintenanceConfig(), nil
+ }
+
+ configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ glog.V(1).Infof("Maintenance config file does not exist, using defaults: %s", configPath)
+ return DefaultMaintenanceConfig(), nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read maintenance config file: %v", err)
+ }
+
+ // Unmarshal JSON
+ var config MaintenanceConfig
+ if err := json.Unmarshal(configData, &config); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal maintenance config: %v", err)
+ }
+
+ glog.V(1).Infof("Loaded maintenance configuration from %s", configPath)
+ return &config, nil
+}
+
+// SaveAdminConfig saves general admin configuration to JSON file
+func (cp *ConfigPersistence) SaveAdminConfig(config map[string]interface{}) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot save configuration")
+ }
+
+ configPath := filepath.Join(cp.dataDir, AdminConfigFile)
+
+ // Create directory if it doesn't exist
+ if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil {
+ return fmt.Errorf("failed to create config directory: %v", err)
+ }
+
+ // Marshal configuration to JSON
+ configData, err := json.MarshalIndent(config, "", " ")
+ if err != nil {
+ return fmt.Errorf("failed to marshal admin config: %v", err)
+ }
+
+ // Write to file
+ if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write admin config file: %v", err)
+ }
+
+ glog.V(1).Infof("Saved admin configuration to %s", configPath)
+ return nil
+}
+
+// LoadAdminConfig loads general admin configuration from JSON file
+func (cp *ConfigPersistence) LoadAdminConfig() (map[string]interface{}, error) {
+ if cp.dataDir == "" {
+ glog.V(1).Infof("No data directory specified, using default admin configuration")
+ return make(map[string]interface{}), nil
+ }
+
+ configPath := filepath.Join(cp.dataDir, AdminConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ glog.V(1).Infof("Admin config file does not exist, using defaults: %s", configPath)
+ return make(map[string]interface{}), nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read admin config file: %v", err)
+ }
+
+ // Unmarshal JSON
+ var config map[string]interface{}
+ if err := json.Unmarshal(configData, &config); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal admin config: %v", err)
+ }
+
+ glog.V(1).Infof("Loaded admin configuration from %s", configPath)
+ return config, nil
+}
+
+// GetConfigPath returns the path to a configuration file
+func (cp *ConfigPersistence) GetConfigPath(filename string) string {
+ if cp.dataDir == "" {
+ return ""
+ }
+ return filepath.Join(cp.dataDir, filename)
+}
+
+// ListConfigFiles returns all configuration files in the data directory
+func (cp *ConfigPersistence) ListConfigFiles() ([]string, error) {
+ if cp.dataDir == "" {
+ return nil, fmt.Errorf("no data directory specified")
+ }
+
+ files, err := os.ReadDir(cp.dataDir)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read config directory: %v", err)
+ }
+
+ var configFiles []string
+ for _, file := range files {
+ if !file.IsDir() && filepath.Ext(file.Name()) == ".json" {
+ configFiles = append(configFiles, file.Name())
+ }
+ }
+
+ return configFiles, nil
+}
+
+// BackupConfig creates a backup of a configuration file
+func (cp *ConfigPersistence) BackupConfig(filename string) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified")
+ }
+
+ configPath := filepath.Join(cp.dataDir, filename)
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ return fmt.Errorf("config file does not exist: %s", filename)
+ }
+
+ // Create backup filename with timestamp
+ timestamp := time.Now().Format("2006-01-02_15-04-05")
+ backupName := fmt.Sprintf("%s.backup_%s", filename, timestamp)
+ backupPath := filepath.Join(cp.dataDir, backupName)
+
+ // Copy file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return fmt.Errorf("failed to read config file: %v", err)
+ }
+
+ if err := os.WriteFile(backupPath, configData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to create backup: %v", err)
+ }
+
+ glog.V(1).Infof("Created backup of %s as %s", filename, backupName)
+ return nil
+}
+
+// RestoreConfig restores a configuration file from a backup
+func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified")
+ }
+
+ backupPath := filepath.Join(cp.dataDir, backupName)
+ if _, err := os.Stat(backupPath); os.IsNotExist(err) {
+ return fmt.Errorf("backup file does not exist: %s", backupName)
+ }
+
+ // Read backup file
+ backupData, err := os.ReadFile(backupPath)
+ if err != nil {
+ return fmt.Errorf("failed to read backup file: %v", err)
+ }
+
+ // Write to config file
+ configPath := filepath.Join(cp.dataDir, filename)
+ if err := os.WriteFile(configPath, backupData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to restore config: %v", err)
+ }
+
+ glog.V(1).Infof("Restored %s from backup %s", filename, backupName)
+ return nil
+}
+
+// GetDataDir returns the data directory path
+func (cp *ConfigPersistence) GetDataDir() string {
+ return cp.dataDir
+}
+
+// IsConfigured returns true if a data directory is configured
+func (cp *ConfigPersistence) IsConfigured() bool {
+ return cp.dataDir != ""
+}
+
+// GetConfigInfo returns information about the configuration storage
+func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
+ info := map[string]interface{}{
+ "data_dir_configured": cp.IsConfigured(),
+ "data_dir": cp.dataDir,
+ }
+
+ if cp.IsConfigured() {
+ // Check if data directory exists
+ if _, err := os.Stat(cp.dataDir); err == nil {
+ info["data_dir_exists"] = true
+
+ // List config files
+ configFiles, err := cp.ListConfigFiles()
+ if err == nil {
+ info["config_files"] = configFiles
+ }
+ } else {
+ info["data_dir_exists"] = false
+ }
+ }
+
+ return info
+}
diff --git a/weed/admin/dash/types.go b/weed/admin/dash/types.go
index 8c0be1aeb..07157d9dc 100644
--- a/weed/admin/dash/types.go
+++ b/weed/admin/dash/types.go
@@ -3,6 +3,7 @@ package dash
import (
"time"
+ "github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
@@ -197,3 +198,51 @@ type ClusterVolumeServersData struct {
TotalCapacity int64 `json:"total_capacity"`
LastUpdated time.Time `json:"last_updated"`
}
+
+// Type aliases for maintenance package types to support existing code
+type MaintenanceTask = maintenance.MaintenanceTask
+type MaintenanceTaskType = maintenance.MaintenanceTaskType
+type MaintenanceTaskStatus = maintenance.MaintenanceTaskStatus
+type MaintenanceTaskPriority = maintenance.MaintenanceTaskPriority
+type MaintenanceWorker = maintenance.MaintenanceWorker
+type MaintenanceConfig = maintenance.MaintenanceConfig
+type MaintenanceStats = maintenance.MaintenanceStats
+type MaintenanceConfigData = maintenance.MaintenanceConfigData
+type MaintenanceQueueData = maintenance.MaintenanceQueueData
+type QueueStats = maintenance.QueueStats
+type WorkerDetailsData = maintenance.WorkerDetailsData
+type WorkerPerformance = maintenance.WorkerPerformance
+
+// GetTaskIcon returns the icon CSS class for a task type from its UI provider
+func GetTaskIcon(taskType MaintenanceTaskType) string {
+ return maintenance.GetTaskIcon(taskType)
+}
+
+// Status constants (these are still static)
+const (
+ TaskStatusPending = maintenance.TaskStatusPending
+ TaskStatusAssigned = maintenance.TaskStatusAssigned
+ TaskStatusInProgress = maintenance.TaskStatusInProgress
+ TaskStatusCompleted = maintenance.TaskStatusCompleted
+ TaskStatusFailed = maintenance.TaskStatusFailed
+ TaskStatusCancelled = maintenance.TaskStatusCancelled
+
+ PriorityLow = maintenance.PriorityLow
+ PriorityNormal = maintenance.PriorityNormal
+ PriorityHigh = maintenance.PriorityHigh
+ PriorityCritical = maintenance.PriorityCritical
+)
+
+// Helper functions from maintenance package
+var DefaultMaintenanceConfig = maintenance.DefaultMaintenanceConfig
+
+// MaintenanceWorkersData represents the data for the maintenance workers page
+type MaintenanceWorkersData struct {
+ Workers []*WorkerDetailsData `json:"workers"`
+ ActiveWorkers int `json:"active_workers"`
+ BusyWorkers int `json:"busy_workers"`
+ TotalLoad int `json:"total_load"`
+ LastUpdated time.Time `json:"last_updated"`
+}
+
+// Maintenance system types are now in weed/admin/maintenance package
diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go
new file mode 100644
index 000000000..c824cc388
--- /dev/null
+++ b/weed/admin/dash/worker_grpc_server.go
@@ -0,0 +1,461 @@
+package dash
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/peer"
+)
+
+// WorkerGrpcServer implements the WorkerService gRPC interface
+type WorkerGrpcServer struct {
+ worker_pb.UnimplementedWorkerServiceServer
+ adminServer *AdminServer
+
+ // Worker connection management
+ connections map[string]*WorkerConnection
+ connMutex sync.RWMutex
+
+ // gRPC server
+ grpcServer *grpc.Server
+ listener net.Listener
+ running bool
+ stopChan chan struct{}
+}
+
+// WorkerConnection represents an active worker connection
+type WorkerConnection struct {
+ workerID string
+ stream worker_pb.WorkerService_WorkerStreamServer
+ lastSeen time.Time
+ capabilities []MaintenanceTaskType
+ address string
+ maxConcurrent int32
+ outgoing chan *worker_pb.AdminMessage
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+// NewWorkerGrpcServer creates a new gRPC server for worker connections
+func NewWorkerGrpcServer(adminServer *AdminServer) *WorkerGrpcServer {
+ return &WorkerGrpcServer{
+ adminServer: adminServer,
+ connections: make(map[string]*WorkerConnection),
+ stopChan: make(chan struct{}),
+ }
+}
+
+// StartWithTLS starts the gRPC server on the specified port with optional TLS
+func (s *WorkerGrpcServer) StartWithTLS(port int) error {
+ if s.running {
+ return fmt.Errorf("worker gRPC server is already running")
+ }
+
+ // Create listener
+ listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
+ if err != nil {
+ return fmt.Errorf("failed to listen on port %d: %v", port, err)
+ }
+
+ // Create gRPC server with optional TLS
+ grpcServer := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.admin"))
+
+ worker_pb.RegisterWorkerServiceServer(grpcServer, s)
+
+ s.grpcServer = grpcServer
+ s.listener = listener
+ s.running = true
+
+ // Start cleanup routine
+ go s.cleanupRoutine()
+
+ // Start serving in a goroutine
+ go func() {
+ if err := s.grpcServer.Serve(listener); err != nil {
+ if s.running {
+ glog.Errorf("Worker gRPC server error: %v", err)
+ }
+ }
+ }()
+
+ return nil
+}
+
+// Stop stops the gRPC server
+func (s *WorkerGrpcServer) Stop() error {
+ if !s.running {
+ return nil
+ }
+
+ s.running = false
+ close(s.stopChan)
+
+ // Close all worker connections
+ s.connMutex.Lock()
+ for _, conn := range s.connections {
+ conn.cancel()
+ close(conn.outgoing)
+ }
+ s.connections = make(map[string]*WorkerConnection)
+ s.connMutex.Unlock()
+
+ // Stop gRPC server
+ if s.grpcServer != nil {
+ s.grpcServer.GracefulStop()
+ }
+
+ // Close listener
+ if s.listener != nil {
+ s.listener.Close()
+ }
+
+ glog.Infof("Worker gRPC server stopped")
+ return nil
+}
+
+// WorkerStream handles bidirectional communication with workers
+func (s *WorkerGrpcServer) WorkerStream(stream worker_pb.WorkerService_WorkerStreamServer) error {
+ ctx := stream.Context()
+
+ // get client address
+ address := findClientAddress(ctx)
+
+ // Wait for initial registration message
+ msg, err := stream.Recv()
+ if err != nil {
+ return fmt.Errorf("failed to receive registration message: %v", err)
+ }
+
+ registration := msg.GetRegistration()
+ if registration == nil {
+ return fmt.Errorf("first message must be registration")
+ }
+ registration.Address = address
+
+ workerID := registration.WorkerId
+ if workerID == "" {
+ return fmt.Errorf("worker ID cannot be empty")
+ }
+
+ glog.Infof("Worker %s connecting from %s", workerID, registration.Address)
+
+ // Create worker connection
+ connCtx, connCancel := context.WithCancel(ctx)
+ conn := &WorkerConnection{
+ workerID: workerID,
+ stream: stream,
+ lastSeen: time.Now(),
+ address: registration.Address,
+ maxConcurrent: registration.MaxConcurrent,
+ outgoing: make(chan *worker_pb.AdminMessage, 100),
+ ctx: connCtx,
+ cancel: connCancel,
+ }
+
+ // Convert capabilities
+ capabilities := make([]MaintenanceTaskType, len(registration.Capabilities))
+ for i, cap := range registration.Capabilities {
+ capabilities[i] = MaintenanceTaskType(cap)
+ }
+ conn.capabilities = capabilities
+
+ // Register connection
+ s.connMutex.Lock()
+ s.connections[workerID] = conn
+ s.connMutex.Unlock()
+
+ // Register worker with maintenance manager
+ s.registerWorkerWithManager(conn)
+
+ // Send registration response
+ regResponse := &worker_pb.AdminMessage{
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.AdminMessage_RegistrationResponse{
+ RegistrationResponse: &worker_pb.RegistrationResponse{
+ Success: true,
+ Message: "Worker registered successfully",
+ },
+ },
+ }
+
+ select {
+ case conn.outgoing <- regResponse:
+ case <-time.After(5 * time.Second):
+ glog.Errorf("Failed to send registration response to worker %s", workerID)
+ }
+
+ // Start outgoing message handler
+ go s.handleOutgoingMessages(conn)
+
+ // Handle incoming messages
+ for {
+ select {
+ case <-ctx.Done():
+ glog.Infof("Worker %s connection closed: %v", workerID, ctx.Err())
+ s.unregisterWorker(workerID)
+ return nil
+ case <-connCtx.Done():
+ glog.Infof("Worker %s connection cancelled", workerID)
+ s.unregisterWorker(workerID)
+ return nil
+ default:
+ }
+
+ msg, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ glog.Infof("Worker %s disconnected", workerID)
+ } else {
+ glog.Errorf("Error receiving from worker %s: %v", workerID, err)
+ }
+ s.unregisterWorker(workerID)
+ return err
+ }
+
+ conn.lastSeen = time.Now()
+ s.handleWorkerMessage(conn, msg)
+ }
+}
+
+// handleOutgoingMessages sends messages to worker
+func (s *WorkerGrpcServer) handleOutgoingMessages(conn *WorkerConnection) {
+ for {
+ select {
+ case <-conn.ctx.Done():
+ return
+ case msg, ok := <-conn.outgoing:
+ if !ok {
+ return
+ }
+
+ if err := conn.stream.Send(msg); err != nil {
+ glog.Errorf("Failed to send message to worker %s: %v", conn.workerID, err)
+ conn.cancel()
+ return
+ }
+ }
+ }
+}
+
+// handleWorkerMessage processes incoming messages from workers
+func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *worker_pb.WorkerMessage) {
+ workerID := conn.workerID
+
+ switch m := msg.Message.(type) {
+ case *worker_pb.WorkerMessage_Heartbeat:
+ s.handleHeartbeat(conn, m.Heartbeat)
+
+ case *worker_pb.WorkerMessage_TaskRequest:
+ s.handleTaskRequest(conn, m.TaskRequest)
+
+ case *worker_pb.WorkerMessage_TaskUpdate:
+ s.handleTaskUpdate(conn, m.TaskUpdate)
+
+ case *worker_pb.WorkerMessage_TaskComplete:
+ s.handleTaskCompletion(conn, m.TaskComplete)
+
+ case *worker_pb.WorkerMessage_Shutdown:
+ glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason)
+ s.unregisterWorker(workerID)
+
+ default:
+ glog.Warningf("Unknown message type from worker %s", workerID)
+ }
+}
+
+// registerWorkerWithManager registers the worker with the maintenance manager
+func (s *WorkerGrpcServer) registerWorkerWithManager(conn *WorkerConnection) {
+ if s.adminServer.maintenanceManager == nil {
+ return
+ }
+
+ worker := &MaintenanceWorker{
+ ID: conn.workerID,
+ Address: conn.address,
+ LastHeartbeat: time.Now(),
+ Status: "active",
+ Capabilities: conn.capabilities,
+ MaxConcurrent: int(conn.maxConcurrent),
+ CurrentLoad: 0,
+ }
+
+ s.adminServer.maintenanceManager.RegisterWorker(worker)
+ glog.V(1).Infof("Registered worker %s with maintenance manager", conn.workerID)
+}
+
+// handleHeartbeat processes heartbeat messages
+func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *worker_pb.WorkerHeartbeat) {
+ if s.adminServer.maintenanceManager != nil {
+ s.adminServer.maintenanceManager.UpdateWorkerHeartbeat(conn.workerID)
+ }
+
+ // Send heartbeat response
+ response := &worker_pb.AdminMessage{
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.AdminMessage_HeartbeatResponse{
+ HeartbeatResponse: &worker_pb.HeartbeatResponse{
+ Success: true,
+ Message: "Heartbeat acknowledged",
+ },
+ },
+ }
+
+ select {
+ case conn.outgoing <- response:
+ case <-time.After(time.Second):
+ glog.Warningf("Failed to send heartbeat response to worker %s", conn.workerID)
+ }
+}
+
+// handleTaskRequest processes task requests from workers
+func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) {
+ if s.adminServer.maintenanceManager == nil {
+ return
+ }
+
+ // Get next task from maintenance manager
+ task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities)
+
+ if task != nil {
+ // Send task assignment
+ assignment := &worker_pb.AdminMessage{
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.AdminMessage_TaskAssignment{
+ TaskAssignment: &worker_pb.TaskAssignment{
+ TaskId: task.ID,
+ TaskType: string(task.Type),
+ Params: &worker_pb.TaskParams{
+ VolumeId: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ Parameters: convertTaskParameters(task.Parameters),
+ },
+ Priority: int32(task.Priority),
+ CreatedTime: time.Now().Unix(),
+ },
+ },
+ }
+
+ select {
+ case conn.outgoing <- assignment:
+ glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID)
+ case <-time.After(time.Second):
+ glog.Warningf("Failed to send task assignment to worker %s", conn.workerID)
+ }
+ }
+}
+
+// handleTaskUpdate processes task progress updates
+func (s *WorkerGrpcServer) handleTaskUpdate(conn *WorkerConnection, update *worker_pb.TaskUpdate) {
+ if s.adminServer.maintenanceManager != nil {
+ s.adminServer.maintenanceManager.UpdateTaskProgress(update.TaskId, float64(update.Progress))
+ glog.V(3).Infof("Updated task %s progress: %.1f%%", update.TaskId, update.Progress)
+ }
+}
+
+// handleTaskCompletion processes task completion notifications
+func (s *WorkerGrpcServer) handleTaskCompletion(conn *WorkerConnection, completion *worker_pb.TaskComplete) {
+ if s.adminServer.maintenanceManager != nil {
+ errorMsg := ""
+ if !completion.Success {
+ errorMsg = completion.ErrorMessage
+ }
+ s.adminServer.maintenanceManager.CompleteTask(completion.TaskId, errorMsg)
+
+ if completion.Success {
+ glog.V(1).Infof("Worker %s completed task %s successfully", conn.workerID, completion.TaskId)
+ } else {
+ glog.Errorf("Worker %s failed task %s: %s", conn.workerID, completion.TaskId, completion.ErrorMessage)
+ }
+ }
+}
+
+// unregisterWorker removes a worker connection
+func (s *WorkerGrpcServer) unregisterWorker(workerID string) {
+ s.connMutex.Lock()
+ if conn, exists := s.connections[workerID]; exists {
+ conn.cancel()
+ close(conn.outgoing)
+ delete(s.connections, workerID)
+ }
+ s.connMutex.Unlock()
+
+ glog.V(1).Infof("Unregistered worker %s", workerID)
+}
+
+// cleanupRoutine periodically cleans up stale connections
+func (s *WorkerGrpcServer) cleanupRoutine() {
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-s.stopChan:
+ return
+ case <-ticker.C:
+ s.cleanupStaleConnections()
+ }
+ }
+}
+
+// cleanupStaleConnections removes connections that haven't been seen recently
+func (s *WorkerGrpcServer) cleanupStaleConnections() {
+ cutoff := time.Now().Add(-2 * time.Minute)
+
+ s.connMutex.Lock()
+ defer s.connMutex.Unlock()
+
+ for workerID, conn := range s.connections {
+ if conn.lastSeen.Before(cutoff) {
+ glog.Warningf("Cleaning up stale worker connection: %s", workerID)
+ conn.cancel()
+ close(conn.outgoing)
+ delete(s.connections, workerID)
+ }
+ }
+}
+
+// GetConnectedWorkers returns a list of currently connected workers
+func (s *WorkerGrpcServer) GetConnectedWorkers() []string {
+ s.connMutex.RLock()
+ defer s.connMutex.RUnlock()
+
+ workers := make([]string, 0, len(s.connections))
+ for workerID := range s.connections {
+ workers = append(workers, workerID)
+ }
+ return workers
+}
+
+// convertTaskParameters converts task parameters to protobuf format
+func convertTaskParameters(params map[string]interface{}) map[string]string {
+ result := make(map[string]string)
+ for key, value := range params {
+ result[key] = fmt.Sprintf("%v", value)
+ }
+ return result
+}
+
+func findClientAddress(ctx context.Context) string {
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return ""
+ }
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return ""
+ }
+ return pr.Addr.String()
+}