diff options
Diffstat (limited to 'weed/admin/dash')
| -rw-r--r-- | weed/admin/dash/admin_server.go | 321 | ||||
| -rw-r--r-- | weed/admin/dash/collection_management.go | 268 | ||||
| -rw-r--r-- | weed/admin/dash/config_persistence.go | 632 | ||||
| -rw-r--r-- | weed/admin/dash/ec_shard_management.go | 734 | ||||
| -rw-r--r-- | weed/admin/dash/middleware.go | 23 | ||||
| -rw-r--r-- | weed/admin/dash/types.go | 163 | ||||
| -rw-r--r-- | weed/admin/dash/worker_grpc_server.go | 34 |
7 files changed, 2006 insertions, 169 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index 6ebade19f..376f3edc7 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/s3api" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" ) type AdminServer struct { @@ -126,30 +127,67 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string) } } - // Initialize maintenance system with persistent configuration + // Initialize maintenance system - always initialize even without persistent storage + var maintenanceConfig *maintenance.MaintenanceConfig if server.configPersistence.IsConfigured() { - maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig() + var err error + maintenanceConfig, err = server.configPersistence.LoadMaintenanceConfig() if err != nil { glog.Errorf("Failed to load maintenance configuration: %v", err) maintenanceConfig = maintenance.DefaultMaintenanceConfig() } - server.InitMaintenanceManager(maintenanceConfig) - // Start maintenance manager if enabled - if maintenanceConfig.Enabled { - go func() { - if err := server.StartMaintenanceManager(); err != nil { - glog.Errorf("Failed to start maintenance manager: %v", err) - } - }() + // Apply new defaults to handle schema changes (like enabling by default) + schema := maintenance.GetMaintenanceConfigSchema() + if err := schema.ApplyDefaultsToProtobuf(maintenanceConfig); err != nil { + glog.Warningf("Failed to apply schema defaults to loaded config: %v", err) + } + + // Force enable maintenance system for new default behavior + // This handles the case where old configs had Enabled=false as default + if !maintenanceConfig.Enabled { + glog.V(1).Infof("Enabling maintenance system (new default behavior)") + maintenanceConfig.Enabled = true } + + glog.V(1).Infof("Maintenance system initialized with persistent configuration (enabled: %v)", maintenanceConfig.Enabled) } else { - glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode") + maintenanceConfig = maintenance.DefaultMaintenanceConfig() + glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode (enabled: %v)", maintenanceConfig.Enabled) + } + + // Always initialize maintenance manager + server.InitMaintenanceManager(maintenanceConfig) + + // Load saved task configurations from persistence + server.loadTaskConfigurationsFromPersistence() + + // Start maintenance manager if enabled + if maintenanceConfig.Enabled { + go func() { + // Give master client a bit of time to connect before starting scans + time.Sleep(2 * time.Second) + if err := server.StartMaintenanceManager(); err != nil { + glog.Errorf("Failed to start maintenance manager: %v", err) + } + }() } return server } +// loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files +func (s *AdminServer) loadTaskConfigurationsFromPersistence() { + if s.configPersistence == nil || !s.configPersistence.IsConfigured() { + glog.V(1).Infof("Config persistence not available, using default task configurations") + return + } + + // Load task configurations dynamically using the config update registry + configUpdateRegistry := tasks.GetGlobalConfigUpdateRegistry() + configUpdateRegistry.UpdateAllConfigs(s.configPersistence) +} + // GetCredentialManager returns the credential manager func (s *AdminServer) GetCredentialManager() *credential.CredentialManager { return s.credentialManager @@ -852,6 +890,15 @@ func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"}) } +// cancelMaintenanceTask cancels a pending maintenance task +func (as *AdminServer) cancelMaintenanceTask(taskID string) error { + if as.maintenanceManager == nil { + return fmt.Errorf("maintenance manager not initialized") + } + + return as.maintenanceManager.CancelTask(taskID) +} + // GetMaintenanceWorkersAPI returns all maintenance workers func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) { workers, err := as.getMaintenanceWorkers() @@ -899,13 +946,21 @@ func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) { // UpdateMaintenanceConfigAPI updates maintenance configuration via API func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) { - var config MaintenanceConfig - if err := c.ShouldBindJSON(&config); err != nil { + // Parse JSON into a generic map first to handle type conversions + var jsonConfig map[string]interface{} + if err := c.ShouldBindJSON(&jsonConfig); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - err := as.updateMaintenanceConfig(&config) + // Convert JSON map to protobuf configuration + config, err := convertJSONToMaintenanceConfig(jsonConfig) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse configuration: " + err.Error()}) + return + } + + err = as.updateMaintenanceConfig(config) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return @@ -951,17 +1006,36 @@ func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueD }, nil } +// GetMaintenanceQueueStats returns statistics for the maintenance queue (exported for handlers) +func (as *AdminServer) GetMaintenanceQueueStats() (*maintenance.QueueStats, error) { + return as.getMaintenanceQueueStats() +} + // getMaintenanceQueueStats returns statistics for the maintenance queue func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) { - // This would integrate with the maintenance queue to get real statistics - // For now, return mock data - return &maintenance.QueueStats{ - PendingTasks: 5, - RunningTasks: 2, - CompletedToday: 15, - FailedToday: 1, - TotalTasks: 23, - }, nil + if as.maintenanceManager == nil { + return &maintenance.QueueStats{ + PendingTasks: 0, + RunningTasks: 0, + CompletedToday: 0, + FailedToday: 0, + TotalTasks: 0, + }, nil + } + + // Get real statistics from maintenance manager + stats := as.maintenanceManager.GetStats() + + // Convert MaintenanceStats to QueueStats + queueStats := &maintenance.QueueStats{ + PendingTasks: stats.TasksByStatus[maintenance.TaskStatusPending], + RunningTasks: stats.TasksByStatus[maintenance.TaskStatusAssigned] + stats.TasksByStatus[maintenance.TaskStatusInProgress], + CompletedToday: stats.CompletedToday, + FailedToday: stats.FailedToday, + TotalTasks: stats.TotalTasks, + } + + return queueStats, nil } // getMaintenanceTasks returns all maintenance tasks @@ -1000,15 +1074,6 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro return nil, fmt.Errorf("task %s not found", taskID) } -// cancelMaintenanceTask cancels a pending maintenance task -func (as *AdminServer) cancelMaintenanceTask(taskID string) error { - if as.maintenanceManager == nil { - return fmt.Errorf("maintenance manager not initialized") - } - - return as.maintenanceManager.CancelTask(taskID) -} - // getMaintenanceWorkers returns all maintenance workers func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) { if as.maintenanceManager == nil { @@ -1110,11 +1175,14 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat // Load configuration from persistent storage config, err := as.configPersistence.LoadMaintenanceConfig() if err != nil { - glog.Errorf("Failed to load maintenance configuration: %v", err) // Fallback to default configuration - config = DefaultMaintenanceConfig() + config = maintenance.DefaultMaintenanceConfig() } + // Note: Do NOT apply schema defaults to existing config as it overrides saved values + // Only apply defaults when creating new configs or handling fallback cases + // The schema defaults should only be used in the UI for new installations + // Get system stats from maintenance manager if available var systemStats *MaintenanceStats if as.maintenanceManager != nil { @@ -1139,18 +1207,25 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat } } - return &MaintenanceConfigData{ + configData := &MaintenanceConfigData{ Config: config, IsEnabled: config.Enabled, LastScanTime: systemStats.LastScanTime, NextScanTime: systemStats.NextScanTime, SystemStats: systemStats, MenuItems: maintenance.BuildMaintenanceMenuItems(), - }, nil + } + + return configData, nil } // updateMaintenanceConfig updates maintenance configuration func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error { + // Use ConfigField validation instead of standalone validation + if err := maintenance.ValidateMaintenanceConfigWithSchema(config); err != nil { + return fmt.Errorf("configuration validation failed: %v", err) + } + // Save configuration to persistent storage if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil { return fmt.Errorf("failed to save maintenance configuration: %w", err) @@ -1175,7 +1250,14 @@ func (as *AdminServer) triggerMaintenanceScan() error { return fmt.Errorf("maintenance manager not initialized") } - return as.maintenanceManager.TriggerScan() + glog.V(1).Infof("Triggering maintenance scan") + err := as.maintenanceManager.TriggerScan() + if err != nil { + glog.Errorf("Failed to trigger maintenance scan: %v", err) + return err + } + glog.V(1).Infof("Maintenance scan triggered successfully") + return nil } // TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API @@ -1265,14 +1347,11 @@ func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, err } // StartWorkerGrpcServer starts the worker gRPC server -func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error { +func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error { if s.workerGrpcServer != nil { return fmt.Errorf("worker gRPC server is already running") } - // Calculate gRPC port (HTTP port + 10000) - grpcPort := httpPort + 10000 - s.workerGrpcServer = NewWorkerGrpcServer(s) return s.workerGrpcServer.StartWithTLS(grpcPort) } @@ -1412,7 +1491,7 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, } // Create gRPC connection - conn, err := grpc.Dial(brokerAddress, s.grpcDialOption) + conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption) if err != nil { return fmt.Errorf("failed to connect to broker: %w", err) } @@ -1501,3 +1580,161 @@ func extractVersioningFromEntry(entry *filer_pb.Entry) bool { enabled, _ := s3api.LoadVersioningFromExtended(entry) return enabled } + +// GetConfigPersistence returns the config persistence manager +func (as *AdminServer) GetConfigPersistence() *ConfigPersistence { + return as.configPersistence +} + +// convertJSONToMaintenanceConfig converts JSON map to protobuf MaintenanceConfig +func convertJSONToMaintenanceConfig(jsonConfig map[string]interface{}) (*maintenance.MaintenanceConfig, error) { + config := &maintenance.MaintenanceConfig{} + + // Helper function to get int32 from interface{} + getInt32 := func(key string) (int32, error) { + if val, ok := jsonConfig[key]; ok { + switch v := val.(type) { + case int: + return int32(v), nil + case int32: + return v, nil + case int64: + return int32(v), nil + case float64: + return int32(v), nil + default: + return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v) + } + } + return 0, nil + } + + // Helper function to get bool from interface{} + getBool := func(key string) bool { + if val, ok := jsonConfig[key]; ok { + if b, ok := val.(bool); ok { + return b + } + } + return false + } + + var err error + + // Convert basic fields + config.Enabled = getBool("enabled") + + if config.ScanIntervalSeconds, err = getInt32("scan_interval_seconds"); err != nil { + return nil, err + } + if config.WorkerTimeoutSeconds, err = getInt32("worker_timeout_seconds"); err != nil { + return nil, err + } + if config.TaskTimeoutSeconds, err = getInt32("task_timeout_seconds"); err != nil { + return nil, err + } + if config.RetryDelaySeconds, err = getInt32("retry_delay_seconds"); err != nil { + return nil, err + } + if config.MaxRetries, err = getInt32("max_retries"); err != nil { + return nil, err + } + if config.CleanupIntervalSeconds, err = getInt32("cleanup_interval_seconds"); err != nil { + return nil, err + } + if config.TaskRetentionSeconds, err = getInt32("task_retention_seconds"); err != nil { + return nil, err + } + + // Convert policy if present + if policyData, ok := jsonConfig["policy"]; ok { + if policyMap, ok := policyData.(map[string]interface{}); ok { + policy := &maintenance.MaintenancePolicy{} + + if globalMaxConcurrent, err := getInt32FromMap(policyMap, "global_max_concurrent"); err != nil { + return nil, err + } else { + policy.GlobalMaxConcurrent = globalMaxConcurrent + } + + if defaultRepeatIntervalSeconds, err := getInt32FromMap(policyMap, "default_repeat_interval_seconds"); err != nil { + return nil, err + } else { + policy.DefaultRepeatIntervalSeconds = defaultRepeatIntervalSeconds + } + + if defaultCheckIntervalSeconds, err := getInt32FromMap(policyMap, "default_check_interval_seconds"); err != nil { + return nil, err + } else { + policy.DefaultCheckIntervalSeconds = defaultCheckIntervalSeconds + } + + // Convert task policies if present + if taskPoliciesData, ok := policyMap["task_policies"]; ok { + if taskPoliciesMap, ok := taskPoliciesData.(map[string]interface{}); ok { + policy.TaskPolicies = make(map[string]*maintenance.TaskPolicy) + + for taskType, taskPolicyData := range taskPoliciesMap { + if taskPolicyMap, ok := taskPolicyData.(map[string]interface{}); ok { + taskPolicy := &maintenance.TaskPolicy{} + + taskPolicy.Enabled = getBoolFromMap(taskPolicyMap, "enabled") + + if maxConcurrent, err := getInt32FromMap(taskPolicyMap, "max_concurrent"); err != nil { + return nil, err + } else { + taskPolicy.MaxConcurrent = maxConcurrent + } + + if repeatIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "repeat_interval_seconds"); err != nil { + return nil, err + } else { + taskPolicy.RepeatIntervalSeconds = repeatIntervalSeconds + } + + if checkIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "check_interval_seconds"); err != nil { + return nil, err + } else { + taskPolicy.CheckIntervalSeconds = checkIntervalSeconds + } + + policy.TaskPolicies[taskType] = taskPolicy + } + } + } + } + + config.Policy = policy + } + } + + return config, nil +} + +// Helper functions for map conversion +func getInt32FromMap(m map[string]interface{}, key string) (int32, error) { + if val, ok := m[key]; ok { + switch v := val.(type) { + case int: + return int32(v), nil + case int32: + return v, nil + case int64: + return int32(v), nil + case float64: + return int32(v), nil + default: + return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v) + } + } + return 0, nil +} + +func getBoolFromMap(m map[string]interface{}, key string) bool { + if val, ok := m[key]; ok { + if b, ok := val.(bool); ok { + return b + } + } + return false +} diff --git a/weed/admin/dash/collection_management.go b/weed/admin/dash/collection_management.go index a70c82918..03c1e452b 100644 --- a/weed/admin/dash/collection_management.go +++ b/weed/admin/dash/collection_management.go @@ -12,6 +12,7 @@ import ( func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { var collections []CollectionInfo var totalVolumes int + var totalEcVolumes int var totalFiles int64 var totalSize int64 collectionMap := make(map[string]*CollectionInfo) @@ -28,6 +29,7 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { for _, rack := range dc.RackInfos { for _, node := range rack.DataNodeInfos { for _, diskInfo := range node.DiskInfos { + // Process regular volumes for _, volInfo := range diskInfo.VolumeInfos { // Extract collection name from volume info collectionName := volInfo.Collection @@ -69,12 +71,13 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { totalSize += int64(volInfo.Size) } else { newCollection := CollectionInfo{ - Name: collectionName, - DataCenter: dc.Id, - VolumeCount: 1, - FileCount: int64(volInfo.FileCount), - TotalSize: int64(volInfo.Size), - DiskTypes: []string{diskType}, + Name: collectionName, + DataCenter: dc.Id, + VolumeCount: 1, + EcVolumeCount: 0, + FileCount: int64(volInfo.FileCount), + TotalSize: int64(volInfo.Size), + DiskTypes: []string{diskType}, } collectionMap[collectionName] = &newCollection totalVolumes++ @@ -82,6 +85,63 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { totalSize += int64(volInfo.Size) } } + + // Process EC volumes + ecVolumeMap := make(map[uint32]bool) // Track unique EC volumes to avoid double counting + for _, ecShardInfo := range diskInfo.EcShardInfos { + // Extract collection name from EC shard info + collectionName := ecShardInfo.Collection + if collectionName == "" { + collectionName = "default" // Default collection for EC volumes without explicit collection + } + + // Only count each EC volume once (not per shard) + if !ecVolumeMap[ecShardInfo.Id] { + ecVolumeMap[ecShardInfo.Id] = true + + // Get disk type from disk info, default to hdd if empty + diskType := diskInfo.Type + if diskType == "" { + diskType = "hdd" + } + + // Get or create collection info + if collection, exists := collectionMap[collectionName]; exists { + collection.EcVolumeCount++ + + // Update data center if this collection spans multiple DCs + if collection.DataCenter != dc.Id && collection.DataCenter != "multi" { + collection.DataCenter = "multi" + } + + // Add disk type if not already present + diskTypeExists := false + for _, existingDiskType := range collection.DiskTypes { + if existingDiskType == diskType { + diskTypeExists = true + break + } + } + if !diskTypeExists { + collection.DiskTypes = append(collection.DiskTypes, diskType) + } + + totalEcVolumes++ + } else { + newCollection := CollectionInfo{ + Name: collectionName, + DataCenter: dc.Id, + VolumeCount: 0, + EcVolumeCount: 1, + FileCount: 0, + TotalSize: 0, + DiskTypes: []string{diskType}, + } + collectionMap[collectionName] = &newCollection + totalEcVolumes++ + } + } + } } } } @@ -112,6 +172,7 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { Collections: []CollectionInfo{}, TotalCollections: 0, TotalVolumes: 0, + TotalEcVolumes: 0, TotalFiles: 0, TotalSize: 0, LastUpdated: time.Now(), @@ -122,8 +183,203 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { Collections: collections, TotalCollections: len(collections), TotalVolumes: totalVolumes, + TotalEcVolumes: totalEcVolumes, TotalFiles: totalFiles, TotalSize: totalSize, LastUpdated: time.Now(), }, nil } + +// GetCollectionDetails retrieves detailed information for a specific collection including volumes and EC volumes +func (s *AdminServer) GetCollectionDetails(collectionName string, page int, pageSize int, sortBy string, sortOrder string) (*CollectionDetailsData, error) { + // Set defaults + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 1000 { + pageSize = 25 + } + if sortBy == "" { + sortBy = "volume_id" + } + if sortOrder == "" { + sortOrder = "asc" + } + + var regularVolumes []VolumeWithTopology + var ecVolumes []EcVolumeWithShards + var totalFiles int64 + var totalSize int64 + dataCenters := make(map[string]bool) + diskTypes := make(map[string]bool) + + // Get regular volumes for this collection + regularVolumeData, err := s.GetClusterVolumes(1, 10000, "volume_id", "asc", collectionName) // Get all volumes + if err != nil { + return nil, err + } + + regularVolumes = regularVolumeData.Volumes + totalSize = regularVolumeData.TotalSize + + // Calculate total files from regular volumes + for _, vol := range regularVolumes { + totalFiles += int64(vol.FileCount) + } + + // Collect data centers and disk types from regular volumes + for _, vol := range regularVolumes { + dataCenters[vol.DataCenter] = true + diskTypes[vol.DiskType] = true + } + + // Get EC volumes for this collection + ecVolumeData, err := s.GetClusterEcVolumes(1, 10000, "volume_id", "asc", collectionName) // Get all EC volumes + if err != nil { + return nil, err + } + + ecVolumes = ecVolumeData.EcVolumes + + // Collect data centers from EC volumes + for _, ecVol := range ecVolumes { + for _, dc := range ecVol.DataCenters { + dataCenters[dc] = true + } + } + + // Combine all volumes for sorting and pagination + type VolumeForSorting struct { + Type string // "regular" or "ec" + RegularVolume *VolumeWithTopology + EcVolume *EcVolumeWithShards + } + + var allVolumes []VolumeForSorting + for i := range regularVolumes { + allVolumes = append(allVolumes, VolumeForSorting{ + Type: "regular", + RegularVolume: ®ularVolumes[i], + }) + } + for i := range ecVolumes { + allVolumes = append(allVolumes, VolumeForSorting{ + Type: "ec", + EcVolume: &ecVolumes[i], + }) + } + + // Sort all volumes + sort.Slice(allVolumes, func(i, j int) bool { + var less bool + switch sortBy { + case "volume_id": + var idI, idJ uint32 + if allVolumes[i].Type == "regular" { + idI = allVolumes[i].RegularVolume.Id + } else { + idI = allVolumes[i].EcVolume.VolumeID + } + if allVolumes[j].Type == "regular" { + idJ = allVolumes[j].RegularVolume.Id + } else { + idJ = allVolumes[j].EcVolume.VolumeID + } + less = idI < idJ + case "type": + // Sort by type first (regular before ec), then by volume ID + if allVolumes[i].Type == allVolumes[j].Type { + var idI, idJ uint32 + if allVolumes[i].Type == "regular" { + idI = allVolumes[i].RegularVolume.Id + } else { + idI = allVolumes[i].EcVolume.VolumeID + } + if allVolumes[j].Type == "regular" { + idJ = allVolumes[j].RegularVolume.Id + } else { + idJ = allVolumes[j].EcVolume.VolumeID + } + less = idI < idJ + } else { + less = allVolumes[i].Type < allVolumes[j].Type // "ec" < "regular" + } + default: + // Default to volume ID sort + var idI, idJ uint32 + if allVolumes[i].Type == "regular" { + idI = allVolumes[i].RegularVolume.Id + } else { + idI = allVolumes[i].EcVolume.VolumeID + } + if allVolumes[j].Type == "regular" { + idJ = allVolumes[j].RegularVolume.Id + } else { + idJ = allVolumes[j].EcVolume.VolumeID + } + less = idI < idJ + } + + if sortOrder == "desc" { + return !less + } + return less + }) + + // Apply pagination + totalVolumesAndEc := len(allVolumes) + totalPages := (totalVolumesAndEc + pageSize - 1) / pageSize + startIndex := (page - 1) * pageSize + endIndex := startIndex + pageSize + if endIndex > totalVolumesAndEc { + endIndex = totalVolumesAndEc + } + + if startIndex >= totalVolumesAndEc { + startIndex = 0 + endIndex = 0 + } + + // Extract paginated results + var paginatedRegularVolumes []VolumeWithTopology + var paginatedEcVolumes []EcVolumeWithShards + + for i := startIndex; i < endIndex; i++ { + if allVolumes[i].Type == "regular" { + paginatedRegularVolumes = append(paginatedRegularVolumes, *allVolumes[i].RegularVolume) + } else { + paginatedEcVolumes = append(paginatedEcVolumes, *allVolumes[i].EcVolume) + } + } + + // Convert maps to slices + var dcList []string + for dc := range dataCenters { + dcList = append(dcList, dc) + } + sort.Strings(dcList) + + var diskTypeList []string + for diskType := range diskTypes { + diskTypeList = append(diskTypeList, diskType) + } + sort.Strings(diskTypeList) + + return &CollectionDetailsData{ + CollectionName: collectionName, + RegularVolumes: paginatedRegularVolumes, + EcVolumes: paginatedEcVolumes, + TotalVolumes: len(regularVolumes), + TotalEcVolumes: len(ecVolumes), + TotalFiles: totalFiles, + TotalSize: totalSize, + DataCenters: dcList, + DiskTypes: diskTypeList, + LastUpdated: time.Now(), + Page: page, + PageSize: pageSize, + TotalPages: totalPages, + SortBy: sortBy, + SortOrder: sortOrder, + }, nil +} diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index a2f74f4e7..b6b3074ab 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -1,23 +1,50 @@ package dash import ( - "encoding/json" "fmt" "os" "path/filepath" "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) const ( - // Configuration file names - MaintenanceConfigFile = "maintenance.json" - AdminConfigFile = "admin.json" + // Configuration subdirectory + ConfigSubdir = "conf" + + // Configuration file names (protobuf binary) + MaintenanceConfigFile = "maintenance.pb" + VacuumTaskConfigFile = "task_vacuum.pb" + ECTaskConfigFile = "task_erasure_coding.pb" + BalanceTaskConfigFile = "task_balance.pb" + ReplicationTaskConfigFile = "task_replication.pb" + + // JSON reference files + MaintenanceConfigJSONFile = "maintenance.json" + VacuumTaskConfigJSONFile = "task_vacuum.json" + ECTaskConfigJSONFile = "task_erasure_coding.json" + BalanceTaskConfigJSONFile = "task_balance.json" + ReplicationTaskConfigJSONFile = "task_replication.json" + ConfigDirPermissions = 0755 ConfigFilePermissions = 0644 ) +// Task configuration types +type ( + VacuumTaskConfig = worker_pb.VacuumTaskConfig + ErasureCodingTaskConfig = worker_pb.ErasureCodingTaskConfig + BalanceTaskConfig = worker_pb.BalanceTaskConfig + ReplicationTaskConfig = worker_pb.ReplicationTaskConfig +) + // ConfigPersistence handles saving and loading configuration files type ConfigPersistence struct { dataDir string @@ -30,122 +57,67 @@ func NewConfigPersistence(dataDir string) *ConfigPersistence { } } -// SaveMaintenanceConfig saves maintenance configuration to JSON file +// SaveMaintenanceConfig saves maintenance configuration to protobuf file and JSON reference func (cp *ConfigPersistence) SaveMaintenanceConfig(config *MaintenanceConfig) error { if cp.dataDir == "" { return fmt.Errorf("no data directory specified, cannot save configuration") } - configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile) - - // Create directory if it doesn't exist - if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil { + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil { return fmt.Errorf("failed to create config directory: %w", err) } - // Marshal configuration to JSON - configData, err := json.MarshalIndent(config, "", " ") - if err != nil { - return fmt.Errorf("failed to marshal maintenance config: %w", err) - } - - // Write to file - if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { - return fmt.Errorf("failed to write maintenance config file: %w", err) - } - - glog.V(1).Infof("Saved maintenance configuration to %s", configPath) - return nil -} - -// LoadMaintenanceConfig loads maintenance configuration from JSON file -func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) { - if cp.dataDir == "" { - glog.V(1).Infof("No data directory specified, using default maintenance configuration") - return DefaultMaintenanceConfig(), nil - } - - configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - glog.V(1).Infof("Maintenance config file does not exist, using defaults: %s", configPath) - return DefaultMaintenanceConfig(), nil - } - - // Read file - configData, err := os.ReadFile(configPath) + // Save as protobuf (primary format) + pbConfigPath := filepath.Join(confDir, MaintenanceConfigFile) + pbData, err := proto.Marshal(config) if err != nil { - return nil, fmt.Errorf("failed to read maintenance config file: %w", err) + return fmt.Errorf("failed to marshal maintenance config to protobuf: %w", err) } - // Unmarshal JSON - var config MaintenanceConfig - if err := json.Unmarshal(configData, &config); err != nil { - return nil, fmt.Errorf("failed to unmarshal maintenance config: %w", err) + if err := os.WriteFile(pbConfigPath, pbData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write protobuf config file: %w", err) } - glog.V(1).Infof("Loaded maintenance configuration from %s", configPath) - return &config, nil -} - -// SaveAdminConfig saves general admin configuration to JSON file -func (cp *ConfigPersistence) SaveAdminConfig(config map[string]interface{}) error { - if cp.dataDir == "" { - return fmt.Errorf("no data directory specified, cannot save configuration") - } - - configPath := filepath.Join(cp.dataDir, AdminConfigFile) - - // Create directory if it doesn't exist - if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil { - return fmt.Errorf("failed to create config directory: %w", err) - } - - // Marshal configuration to JSON - configData, err := json.MarshalIndent(config, "", " ") + // Save JSON reference copy for debugging + jsonConfigPath := filepath.Join(confDir, MaintenanceConfigJSONFile) + jsonData, err := protojson.MarshalOptions{ + Multiline: true, + Indent: " ", + EmitUnpopulated: true, + }.Marshal(config) if err != nil { - return fmt.Errorf("failed to marshal admin config: %w", err) + return fmt.Errorf("failed to marshal maintenance config to JSON: %w", err) } - // Write to file - if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { - return fmt.Errorf("failed to write admin config file: %w", err) + if err := os.WriteFile(jsonConfigPath, jsonData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write JSON reference file: %w", err) } - glog.V(1).Infof("Saved admin configuration to %s", configPath) return nil } -// LoadAdminConfig loads general admin configuration from JSON file -func (cp *ConfigPersistence) LoadAdminConfig() (map[string]interface{}, error) { +// LoadMaintenanceConfig loads maintenance configuration from protobuf file +func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) { if cp.dataDir == "" { - glog.V(1).Infof("No data directory specified, using default admin configuration") - return make(map[string]interface{}), nil - } - - configPath := filepath.Join(cp.dataDir, AdminConfigFile) - - // Check if file exists - if _, err := os.Stat(configPath); os.IsNotExist(err) { - glog.V(1).Infof("Admin config file does not exist, using defaults: %s", configPath) - return make(map[string]interface{}), nil + return DefaultMaintenanceConfig(), nil } - // Read file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read admin config file: %w", err) - } + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, MaintenanceConfigFile) - // Unmarshal JSON - var config map[string]interface{} - if err := json.Unmarshal(configData, &config); err != nil { - return nil, fmt.Errorf("failed to unmarshal admin config: %w", err) + // Try to load from protobuf file + if configData, err := os.ReadFile(configPath); err == nil { + var config MaintenanceConfig + if err := proto.Unmarshal(configData, &config); err == nil { + // Always populate policy from separate task configuration files + config.Policy = buildPolicyFromTaskConfigs() + return &config, nil + } } - glog.V(1).Infof("Loaded admin configuration from %s", configPath) - return config, nil + // File doesn't exist or failed to load, use defaults + return DefaultMaintenanceConfig(), nil } // GetConfigPath returns the path to a configuration file @@ -153,24 +125,35 @@ func (cp *ConfigPersistence) GetConfigPath(filename string) string { if cp.dataDir == "" { return "" } - return filepath.Join(cp.dataDir, filename) + + // All configs go in conf subdirectory + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + return filepath.Join(confDir, filename) } -// ListConfigFiles returns all configuration files in the data directory +// ListConfigFiles returns all configuration files in the conf subdirectory func (cp *ConfigPersistence) ListConfigFiles() ([]string, error) { if cp.dataDir == "" { return nil, fmt.Errorf("no data directory specified") } - files, err := os.ReadDir(cp.dataDir) + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + files, err := os.ReadDir(confDir) if err != nil { + // If conf directory doesn't exist, return empty list + if os.IsNotExist(err) { + return []string{}, nil + } return nil, fmt.Errorf("failed to read config directory: %w", err) } var configFiles []string for _, file := range files { - if !file.IsDir() && filepath.Ext(file.Name()) == ".json" { - configFiles = append(configFiles, file.Name()) + if !file.IsDir() { + ext := filepath.Ext(file.Name()) + if ext == ".json" || ext == ".pb" { + configFiles = append(configFiles, file.Name()) + } } } @@ -183,7 +166,7 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error { return fmt.Errorf("no data directory specified") } - configPath := filepath.Join(cp.dataDir, filename) + configPath := cp.GetConfigPath(filename) if _, err := os.Stat(configPath); os.IsNotExist(err) { return fmt.Errorf("config file does not exist: %s", filename) } @@ -191,7 +174,10 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error { // Create backup filename with timestamp timestamp := time.Now().Format("2006-01-02_15-04-05") backupName := fmt.Sprintf("%s.backup_%s", filename, timestamp) - backupPath := filepath.Join(cp.dataDir, backupName) + + // Determine backup directory (conf subdirectory) + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + backupPath := filepath.Join(confDir, backupName) // Copy file configData, err := os.ReadFile(configPath) @@ -213,7 +199,10 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { return fmt.Errorf("no data directory specified") } - backupPath := filepath.Join(cp.dataDir, backupName) + // Determine backup path (conf subdirectory) + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + backupPath := filepath.Join(confDir, backupName) + if _, err := os.Stat(backupPath); os.IsNotExist(err) { return fmt.Errorf("backup file does not exist: %s", backupName) } @@ -225,7 +214,7 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { } // Write to config file - configPath := filepath.Join(cp.dataDir, filename) + configPath := cp.GetConfigPath(filename) if err := os.WriteFile(configPath, backupData, ConfigFilePermissions); err != nil { return fmt.Errorf("failed to restore config: %w", err) } @@ -234,6 +223,364 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error { return nil } +// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file +func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error { + return cp.saveTaskConfig(VacuumTaskConfigFile, config) +} + +// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file +func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error { + return cp.saveTaskConfig(VacuumTaskConfigFile, policy) +} + +// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file +func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) { + // Load as TaskPolicy and extract vacuum config + if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil { + if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil { + return vacuumConfig, nil + } + } + + // Return default config if no valid config found + return &VacuumTaskConfig{ + GarbageThreshold: 0.3, + MinVolumeAgeHours: 24, + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + }, nil +} + +// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file +func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) { + if cp.dataDir == "" { + // Return default policy if no data directory + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 2, + RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds + CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: 0.3, + MinVolumeAgeHours: 24, + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + }, + }, + }, nil + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, VacuumTaskConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + // Return default policy if file doesn't exist + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 2, + RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds + CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: 0.3, + MinVolumeAgeHours: 24, + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + }, + }, + }, nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read vacuum task config file: %w", err) + } + + // Try to unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err == nil { + // Validate that it's actually a TaskPolicy with vacuum config + if policy.GetVacuumConfig() != nil { + glog.V(1).Infof("Loaded vacuum task policy from %s", configPath) + return &policy, nil + } + } + + return nil, fmt.Errorf("failed to unmarshal vacuum task configuration") +} + +// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file +func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error { + return cp.saveTaskConfig(ECTaskConfigFile, config) +} + +// SaveErasureCodingTaskPolicy saves complete EC task policy to protobuf file +func (cp *ConfigPersistence) SaveErasureCodingTaskPolicy(policy *worker_pb.TaskPolicy) error { + return cp.saveTaskConfig(ECTaskConfigFile, policy) +} + +// LoadErasureCodingTaskConfig loads EC task configuration from protobuf file +func (cp *ConfigPersistence) LoadErasureCodingTaskConfig() (*ErasureCodingTaskConfig, error) { + // Load as TaskPolicy and extract EC config + if taskPolicy, err := cp.LoadErasureCodingTaskPolicy(); err == nil && taskPolicy != nil { + if ecConfig := taskPolicy.GetErasureCodingConfig(); ecConfig != nil { + return ecConfig, nil + } + } + + // Return default config if no valid config found + return &ErasureCodingTaskConfig{ + FullnessRatio: 0.9, + QuietForSeconds: 3600, + MinVolumeSizeMb: 1024, + CollectionFilter: "", + }, nil +} + +// LoadErasureCodingTaskPolicy loads complete EC task policy from protobuf file +func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) { + if cp.dataDir == "" { + // Return default policy if no data directory + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds + CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: 0.9, + QuietForSeconds: 3600, + MinVolumeSizeMb: 1024, + CollectionFilter: "", + }, + }, + }, nil + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, ECTaskConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + // Return default policy if file doesn't exist + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds + CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: 0.9, + QuietForSeconds: 3600, + MinVolumeSizeMb: 1024, + CollectionFilter: "", + }, + }, + }, nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read EC task config file: %w", err) + } + + // Try to unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err == nil { + // Validate that it's actually a TaskPolicy with EC config + if policy.GetErasureCodingConfig() != nil { + glog.V(1).Infof("Loaded EC task policy from %s", configPath) + return &policy, nil + } + } + + return nil, fmt.Errorf("failed to unmarshal EC task configuration") +} + +// SaveBalanceTaskConfig saves balance task configuration to protobuf file +func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error { + return cp.saveTaskConfig(BalanceTaskConfigFile, config) +} + +// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file +func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error { + return cp.saveTaskConfig(BalanceTaskConfigFile, policy) +} + +// LoadBalanceTaskConfig loads balance task configuration from protobuf file +func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) { + // Load as TaskPolicy and extract balance config + if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil { + if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil { + return balanceConfig, nil + } + } + + // Return default config if no valid config found + return &BalanceTaskConfig{ + ImbalanceThreshold: 0.1, + MinServerCount: 2, + }, nil +} + +// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file +func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) { + if cp.dataDir == "" { + // Return default policy if no data directory + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds + CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: 0.1, + MinServerCount: 2, + }, + }, + }, nil + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, BalanceTaskConfigFile) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + // Return default policy if file doesn't exist + return &worker_pb.TaskPolicy{ + Enabled: true, + MaxConcurrent: 1, + RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds + CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: 0.1, + MinServerCount: 2, + }, + }, + }, nil + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read balance task config file: %w", err) + } + + // Try to unmarshal as TaskPolicy + var policy worker_pb.TaskPolicy + if err := proto.Unmarshal(configData, &policy); err == nil { + // Validate that it's actually a TaskPolicy with balance config + if policy.GetBalanceConfig() != nil { + glog.V(1).Infof("Loaded balance task policy from %s", configPath) + return &policy, nil + } + } + + return nil, fmt.Errorf("failed to unmarshal balance task configuration") +} + +// SaveReplicationTaskConfig saves replication task configuration to protobuf file +func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error { + return cp.saveTaskConfig(ReplicationTaskConfigFile, config) +} + +// LoadReplicationTaskConfig loads replication task configuration from protobuf file +func (cp *ConfigPersistence) LoadReplicationTaskConfig() (*ReplicationTaskConfig, error) { + var config ReplicationTaskConfig + err := cp.loadTaskConfig(ReplicationTaskConfigFile, &config) + if err != nil { + // Return default config if file doesn't exist + if os.IsNotExist(err) { + return &ReplicationTaskConfig{ + TargetReplicaCount: 1, + }, nil + } + return nil, err + } + return &config, nil +} + +// saveTaskConfig is a generic helper for saving task configurations with both protobuf and JSON reference +func (cp *ConfigPersistence) saveTaskConfig(filename string, config proto.Message) error { + if cp.dataDir == "" { + return fmt.Errorf("no data directory specified, cannot save task configuration") + } + + // Create conf subdirectory path + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, filename) + + // Generate JSON reference filename + jsonFilename := filename[:len(filename)-3] + ".json" // Replace .pb with .json + jsonPath := filepath.Join(confDir, jsonFilename) + + // Create conf directory if it doesn't exist + if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil { + return fmt.Errorf("failed to create config directory: %w", err) + } + + // Marshal configuration to protobuf binary format + configData, err := proto.Marshal(config) + if err != nil { + return fmt.Errorf("failed to marshal task config: %w", err) + } + + // Write protobuf file + if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil { + return fmt.Errorf("failed to write task config file: %w", err) + } + + // Marshal configuration to JSON for reference + marshaler := protojson.MarshalOptions{ + Multiline: true, + Indent: " ", + EmitUnpopulated: true, + } + jsonData, err := marshaler.Marshal(config) + if err != nil { + glog.Warningf("Failed to marshal task config to JSON reference: %v", err) + } else { + // Write JSON reference file + if err := os.WriteFile(jsonPath, jsonData, ConfigFilePermissions); err != nil { + glog.Warningf("Failed to write task config JSON reference: %v", err) + } + } + + glog.V(1).Infof("Saved task configuration to %s (with JSON reference)", configPath) + return nil +} + +// loadTaskConfig is a generic helper for loading task configurations from conf subdirectory +func (cp *ConfigPersistence) loadTaskConfig(filename string, config proto.Message) error { + if cp.dataDir == "" { + return os.ErrNotExist // Will trigger default config return + } + + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + configPath := filepath.Join(confDir, filename) + + // Check if file exists + if _, err := os.Stat(configPath); os.IsNotExist(err) { + return err // Will trigger default config return + } + + // Read file + configData, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("failed to read task config file: %w", err) + } + + // Unmarshal protobuf binary data + if err := proto.Unmarshal(configData, config); err != nil { + return fmt.Errorf("failed to unmarshal task config: %w", err) + } + + glog.V(1).Infof("Loaded task configuration from %s", configPath) + return nil +} + // GetDataDir returns the data directory path func (cp *ConfigPersistence) GetDataDir() string { return cp.dataDir @@ -249,6 +596,7 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { info := map[string]interface{}{ "data_dir_configured": cp.IsConfigured(), "data_dir": cp.dataDir, + "config_subdir": ConfigSubdir, } if cp.IsConfigured() { @@ -256,10 +604,18 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { if _, err := os.Stat(cp.dataDir); err == nil { info["data_dir_exists"] = true - // List config files - configFiles, err := cp.ListConfigFiles() - if err == nil { - info["config_files"] = configFiles + // Check if conf subdirectory exists + confDir := filepath.Join(cp.dataDir, ConfigSubdir) + if _, err := os.Stat(confDir); err == nil { + info["conf_dir_exists"] = true + + // List config files + configFiles, err := cp.ListConfigFiles() + if err == nil { + info["config_files"] = configFiles + } + } else { + info["conf_dir_exists"] = false } } else { info["data_dir_exists"] = false @@ -268,3 +624,67 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} { return info } + +// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy +func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { + policy := &worker_pb.MaintenancePolicy{ + GlobalMaxConcurrent: 4, + DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds + DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds + TaskPolicies: make(map[string]*worker_pb.TaskPolicy), + } + + // Load vacuum task configuration + if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil { + policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{ + Enabled: vacuumConfig.Enabled, + MaxConcurrent: int32(vacuumConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: float64(vacuumConfig.GarbageThreshold), + MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours + MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds), + }, + }, + } + } + + // Load erasure coding task configuration + if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil { + policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{ + Enabled: ecConfig.Enabled, + MaxConcurrent: int32(ecConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: float64(ecConfig.FullnessRatio), + QuietForSeconds: int32(ecConfig.QuietForSeconds), + MinVolumeSizeMb: int32(ecConfig.MinSizeMB), + CollectionFilter: ecConfig.CollectionFilter, + }, + }, + } + } + + // Load balance task configuration + if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil { + policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{ + Enabled: balanceConfig.Enabled, + MaxConcurrent: int32(balanceConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold), + MinServerCount: int32(balanceConfig.MinServerCount), + }, + }, + } + } + + glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) + return policy +} diff --git a/weed/admin/dash/ec_shard_management.go b/weed/admin/dash/ec_shard_management.go new file mode 100644 index 000000000..272890cf0 --- /dev/null +++ b/weed/admin/dash/ec_shard_management.go @@ -0,0 +1,734 @@ +package dash + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// GetClusterEcShards retrieves cluster EC shards data with pagination, sorting, and filtering +func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcShardsData, error) { + // Set defaults + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 1000 { + pageSize = 100 + } + if sortBy == "" { + sortBy = "volume_id" + } + if sortOrder == "" { + sortOrder = "asc" + } + + var ecShards []EcShardWithInfo + volumeShardsMap := make(map[uint32]map[int]bool) // volumeId -> set of shards present + volumesWithAllShards := 0 + volumesWithMissingShards := 0 + + // Get detailed EC shard information via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + // Process EC shard information + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeId := ecShardInfo.Id + + // Initialize volume shards map if needed + if volumeShardsMap[volumeId] == nil { + volumeShardsMap[volumeId] = make(map[int]bool) + } + + // Create individual shard entries for each shard this server has + shardBits := ecShardInfo.EcIndexBits + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if (shardBits & (1 << uint(shardId))) != 0 { + // Mark this shard as present for this volume + volumeShardsMap[volumeId][shardId] = true + + ecShard := EcShardWithInfo{ + VolumeID: volumeId, + ShardID: uint32(shardId), + Collection: ecShardInfo.Collection, + Size: 0, // EC shards don't have individual size in the API response + Server: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + DiskType: diskInfo.Type, + ModifiedTime: 0, // Not available in current API + EcIndexBits: ecShardInfo.EcIndexBits, + ShardCount: getShardCount(ecShardInfo.EcIndexBits), + } + ecShards = append(ecShards, ecShard) + } + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Calculate volume-level completeness (across all servers) + volumeCompleteness := make(map[uint32]bool) + volumeMissingShards := make(map[uint32][]int) + + for volumeId, shardsPresent := range volumeShardsMap { + var missingShards []int + shardCount := len(shardsPresent) + + // Find which shards are missing for this volume across ALL servers + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if !shardsPresent[shardId] { + missingShards = append(missingShards, shardId) + } + } + + isComplete := (shardCount == erasure_coding.TotalShardsCount) + volumeCompleteness[volumeId] = isComplete + volumeMissingShards[volumeId] = missingShards + + if isComplete { + volumesWithAllShards++ + } else { + volumesWithMissingShards++ + } + } + + // Update completeness info for each shard based on volume-level completeness + for i := range ecShards { + volumeId := ecShards[i].VolumeID + ecShards[i].IsComplete = volumeCompleteness[volumeId] + ecShards[i].MissingShards = volumeMissingShards[volumeId] + } + + // Filter by collection if specified + if collection != "" { + var filteredShards []EcShardWithInfo + for _, shard := range ecShards { + if shard.Collection == collection { + filteredShards = append(filteredShards, shard) + } + } + ecShards = filteredShards + } + + // Sort the results + sortEcShards(ecShards, sortBy, sortOrder) + + // Calculate statistics for conditional display + dataCenters := make(map[string]bool) + racks := make(map[string]bool) + collections := make(map[string]bool) + + for _, shard := range ecShards { + dataCenters[shard.DataCenter] = true + racks[shard.Rack] = true + if shard.Collection != "" { + collections[shard.Collection] = true + } + } + + // Pagination + totalShards := len(ecShards) + totalPages := (totalShards + pageSize - 1) / pageSize + startIndex := (page - 1) * pageSize + endIndex := startIndex + pageSize + if endIndex > totalShards { + endIndex = totalShards + } + + if startIndex >= totalShards { + startIndex = 0 + endIndex = 0 + } + + paginatedShards := ecShards[startIndex:endIndex] + + // Build response + data := &ClusterEcShardsData{ + EcShards: paginatedShards, + TotalShards: totalShards, + TotalVolumes: len(volumeShardsMap), + LastUpdated: time.Now(), + + // Pagination + CurrentPage: page, + TotalPages: totalPages, + PageSize: pageSize, + + // Sorting + SortBy: sortBy, + SortOrder: sortOrder, + + // Statistics + DataCenterCount: len(dataCenters), + RackCount: len(racks), + CollectionCount: len(collections), + + // Conditional display flags + ShowDataCenterColumn: len(dataCenters) > 1, + ShowRackColumn: len(racks) > 1, + ShowCollectionColumn: len(collections) > 1 || collection != "", + + // Filtering + FilterCollection: collection, + + // EC specific statistics + ShardsPerVolume: make(map[uint32]int), // This will be recalculated below + VolumesWithAllShards: volumesWithAllShards, + VolumesWithMissingShards: volumesWithMissingShards, + } + + // Recalculate ShardsPerVolume for the response + for volumeId, shardsPresent := range volumeShardsMap { + data.ShardsPerVolume[volumeId] = len(shardsPresent) + } + + // Set single values when only one exists + if len(dataCenters) == 1 { + for dc := range dataCenters { + data.SingleDataCenter = dc + break + } + } + if len(racks) == 1 { + for rack := range racks { + data.SingleRack = rack + break + } + } + if len(collections) == 1 { + for col := range collections { + data.SingleCollection = col + break + } + } + + return data, nil +} + +// GetClusterEcVolumes retrieves cluster EC volumes data grouped by volume ID with shard locations +func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcVolumesData, error) { + // Set defaults + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 1000 { + pageSize = 100 + } + if sortBy == "" { + sortBy = "volume_id" + } + if sortOrder == "" { + sortOrder = "asc" + } + + volumeData := make(map[uint32]*EcVolumeWithShards) + totalShards := 0 + + // Get detailed EC shard information via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + // Process EC shard information + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeId := ecShardInfo.Id + + // Initialize volume data if needed + if volumeData[volumeId] == nil { + volumeData[volumeId] = &EcVolumeWithShards{ + VolumeID: volumeId, + Collection: ecShardInfo.Collection, + TotalShards: 0, + IsComplete: false, + MissingShards: []int{}, + ShardLocations: make(map[int]string), + ShardSizes: make(map[int]int64), + DataCenters: []string{}, + Servers: []string{}, + Racks: []string{}, + } + } + + volume := volumeData[volumeId] + + // Track data centers and servers + dcExists := false + for _, existingDc := range volume.DataCenters { + if existingDc == dc.Id { + dcExists = true + break + } + } + if !dcExists { + volume.DataCenters = append(volume.DataCenters, dc.Id) + } + + serverExists := false + for _, existingServer := range volume.Servers { + if existingServer == node.Id { + serverExists = true + break + } + } + if !serverExists { + volume.Servers = append(volume.Servers, node.Id) + } + + // Track racks + rackExists := false + for _, existingRack := range volume.Racks { + if existingRack == rack.Id { + rackExists = true + break + } + } + if !rackExists { + volume.Racks = append(volume.Racks, rack.Id) + } + + // Process each shard this server has for this volume + shardBits := ecShardInfo.EcIndexBits + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if (shardBits & (1 << uint(shardId))) != 0 { + // Record shard location + volume.ShardLocations[shardId] = node.Id + totalShards++ + } + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Collect shard size information from volume servers + for volumeId, volume := range volumeData { + // Group servers by volume to minimize gRPC calls + serverHasVolume := make(map[string]bool) + for _, server := range volume.Servers { + serverHasVolume[server] = true + } + + // Query each server for shard sizes + for server := range serverHasVolume { + err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error { + resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: volumeId, + }) + if err != nil { + glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeId, err) + return nil // Continue with other servers, don't fail the entire request + } + + // Update shard sizes + for _, shardInfo := range resp.EcShardInfos { + volume.ShardSizes[int(shardInfo.ShardId)] = shardInfo.Size + } + + return nil + }) + if err != nil { + glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err) + } + } + } + + // Calculate completeness for each volume + completeVolumes := 0 + incompleteVolumes := 0 + + for _, volume := range volumeData { + volume.TotalShards = len(volume.ShardLocations) + + // Find missing shards + var missingShards []int + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if _, exists := volume.ShardLocations[shardId]; !exists { + missingShards = append(missingShards, shardId) + } + } + + volume.MissingShards = missingShards + volume.IsComplete = (len(missingShards) == 0) + + if volume.IsComplete { + completeVolumes++ + } else { + incompleteVolumes++ + } + } + + // Convert map to slice + var ecVolumes []EcVolumeWithShards + for _, volume := range volumeData { + // Filter by collection if specified + if collection == "" || volume.Collection == collection { + ecVolumes = append(ecVolumes, *volume) + } + } + + // Sort the results + sortEcVolumes(ecVolumes, sortBy, sortOrder) + + // Calculate statistics for conditional display + dataCenters := make(map[string]bool) + collections := make(map[string]bool) + + for _, volume := range ecVolumes { + for _, dc := range volume.DataCenters { + dataCenters[dc] = true + } + if volume.Collection != "" { + collections[volume.Collection] = true + } + } + + // Pagination + totalVolumes := len(ecVolumes) + totalPages := (totalVolumes + pageSize - 1) / pageSize + startIndex := (page - 1) * pageSize + endIndex := startIndex + pageSize + if endIndex > totalVolumes { + endIndex = totalVolumes + } + + if startIndex >= totalVolumes { + startIndex = 0 + endIndex = 0 + } + + paginatedVolumes := ecVolumes[startIndex:endIndex] + + // Build response + data := &ClusterEcVolumesData{ + EcVolumes: paginatedVolumes, + TotalVolumes: totalVolumes, + LastUpdated: time.Now(), + + // Pagination + Page: page, + PageSize: pageSize, + TotalPages: totalPages, + + // Sorting + SortBy: sortBy, + SortOrder: sortOrder, + + // Filtering + Collection: collection, + + // Conditional display flags + ShowDataCenterColumn: len(dataCenters) > 1, + ShowRackColumn: false, // We don't track racks in this view for simplicity + ShowCollectionColumn: len(collections) > 1 || collection != "", + + // Statistics + CompleteVolumes: completeVolumes, + IncompleteVolumes: incompleteVolumes, + TotalShards: totalShards, + } + + return data, nil +} + +// sortEcVolumes sorts EC volumes based on the specified field and order +func sortEcVolumes(volumes []EcVolumeWithShards, sortBy string, sortOrder string) { + sort.Slice(volumes, func(i, j int) bool { + var less bool + switch sortBy { + case "volume_id": + less = volumes[i].VolumeID < volumes[j].VolumeID + case "collection": + if volumes[i].Collection == volumes[j].Collection { + less = volumes[i].VolumeID < volumes[j].VolumeID + } else { + less = volumes[i].Collection < volumes[j].Collection + } + case "total_shards": + if volumes[i].TotalShards == volumes[j].TotalShards { + less = volumes[i].VolumeID < volumes[j].VolumeID + } else { + less = volumes[i].TotalShards < volumes[j].TotalShards + } + case "completeness": + // Complete volumes first, then by volume ID + if volumes[i].IsComplete == volumes[j].IsComplete { + less = volumes[i].VolumeID < volumes[j].VolumeID + } else { + less = volumes[i].IsComplete && !volumes[j].IsComplete + } + default: + less = volumes[i].VolumeID < volumes[j].VolumeID + } + + if sortOrder == "desc" { + return !less + } + return less + }) +} + +// getShardCount returns the number of shards represented by the bitmap +func getShardCount(ecIndexBits uint32) int { + count := 0 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if (ecIndexBits & (1 << uint(i))) != 0 { + count++ + } + } + return count +} + +// getMissingShards returns a slice of missing shard IDs for a volume +func getMissingShards(ecIndexBits uint32) []int { + var missing []int + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if (ecIndexBits & (1 << uint(i))) == 0 { + missing = append(missing, i) + } + } + return missing +} + +// sortEcShards sorts EC shards based on the specified field and order +func sortEcShards(shards []EcShardWithInfo, sortBy string, sortOrder string) { + sort.Slice(shards, func(i, j int) bool { + var less bool + switch sortBy { + case "shard_id": + less = shards[i].ShardID < shards[j].ShardID + case "server": + if shards[i].Server == shards[j].Server { + less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID + } else { + less = shards[i].Server < shards[j].Server + } + case "data_center": + if shards[i].DataCenter == shards[j].DataCenter { + less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID + } else { + less = shards[i].DataCenter < shards[j].DataCenter + } + case "rack": + if shards[i].Rack == shards[j].Rack { + less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID + } else { + less = shards[i].Rack < shards[j].Rack + } + default: + less = shards[i].ShardID < shards[j].ShardID + } + + if sortOrder == "desc" { + return !less + } + return less + }) +} + +// GetEcVolumeDetails retrieves detailed information about a specific EC volume +func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrder string) (*EcVolumeDetailsData, error) { + // Set defaults + if sortBy == "" { + sortBy = "shard_id" + } + if sortOrder == "" { + sortOrder = "asc" + } + + var shards []EcShardWithInfo + var collection string + dataCenters := make(map[string]bool) + servers := make(map[string]bool) + + // Get detailed EC shard information for the specific volume via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + // Process EC shard information for this specific volume + for _, ecShardInfo := range diskInfo.EcShardInfos { + if ecShardInfo.Id == volumeID { + collection = ecShardInfo.Collection + dataCenters[dc.Id] = true + servers[node.Id] = true + + // Create individual shard entries for each shard this server has + shardBits := ecShardInfo.EcIndexBits + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if (shardBits & (1 << uint(shardId))) != 0 { + ecShard := EcShardWithInfo{ + VolumeID: ecShardInfo.Id, + ShardID: uint32(shardId), + Collection: ecShardInfo.Collection, + Size: 0, // EC shards don't have individual size in the API response + Server: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + DiskType: diskInfo.Type, + ModifiedTime: 0, // Not available in current API + EcIndexBits: ecShardInfo.EcIndexBits, + ShardCount: getShardCount(ecShardInfo.EcIndexBits), + } + shards = append(shards, ecShard) + } + } + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + if len(shards) == 0 { + return nil, fmt.Errorf("EC volume %d not found", volumeID) + } + + // Collect shard size information from volume servers + shardSizeMap := make(map[string]map[uint32]uint64) // server -> shardId -> size + for _, shard := range shards { + server := shard.Server + if _, exists := shardSizeMap[server]; !exists { + // Query this server for shard sizes + err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error { + resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: volumeID, + }) + if err != nil { + glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeID, err) + return nil // Continue with other servers, don't fail the entire request + } + + // Store shard sizes for this server + shardSizeMap[server] = make(map[uint32]uint64) + for _, shardInfo := range resp.EcShardInfos { + shardSizeMap[server][shardInfo.ShardId] = uint64(shardInfo.Size) + } + + return nil + }) + if err != nil { + glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err) + } + } + } + + // Update shard sizes in the shards array + for i := range shards { + server := shards[i].Server + shardId := shards[i].ShardID + if serverSizes, exists := shardSizeMap[server]; exists { + if size, exists := serverSizes[shardId]; exists { + shards[i].Size = size + } + } + } + + // Calculate completeness based on unique shard IDs + foundShards := make(map[int]bool) + for _, shard := range shards { + foundShards[int(shard.ShardID)] = true + } + + totalUniqueShards := len(foundShards) + isComplete := (totalUniqueShards == erasure_coding.TotalShardsCount) + + // Calculate missing shards + var missingShards []int + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if !foundShards[i] { + missingShards = append(missingShards, i) + } + } + + // Update completeness info for each shard + for i := range shards { + shards[i].IsComplete = isComplete + shards[i].MissingShards = missingShards + } + + // Sort shards based on parameters + sortEcShards(shards, sortBy, sortOrder) + + // Convert maps to slices + var dcList []string + for dc := range dataCenters { + dcList = append(dcList, dc) + } + var serverList []string + for server := range servers { + serverList = append(serverList, server) + } + + data := &EcVolumeDetailsData{ + VolumeID: volumeID, + Collection: collection, + Shards: shards, + TotalShards: totalUniqueShards, + IsComplete: isComplete, + MissingShards: missingShards, + DataCenters: dcList, + Servers: serverList, + LastUpdated: time.Now(), + SortBy: sortBy, + SortOrder: sortOrder, + } + + return data, nil +} diff --git a/weed/admin/dash/middleware.go b/weed/admin/dash/middleware.go index ce538d7ca..a4cfedfd0 100644 --- a/weed/admin/dash/middleware.go +++ b/weed/admin/dash/middleware.go @@ -25,3 +25,26 @@ func RequireAuth() gin.HandlerFunc { c.Next() } } + +// RequireAuthAPI checks if user is authenticated for API endpoints +// Returns JSON error instead of redirecting to login page +func RequireAuthAPI() gin.HandlerFunc { + return func(c *gin.Context) { + session := sessions.Default(c) + authenticated := session.Get("authenticated") + username := session.Get("username") + + if authenticated != true || username == nil { + c.JSON(http.StatusUnauthorized, gin.H{ + "error": "Authentication required", + "message": "Please log in to access this endpoint", + }) + c.Abort() + return + } + + // Set username in context for use in handlers + c.Set("username", username) + c.Next() + } +} diff --git a/weed/admin/dash/types.go b/weed/admin/dash/types.go index 60f499229..f098fad8c 100644 --- a/weed/admin/dash/types.go +++ b/weed/admin/dash/types.go @@ -135,6 +135,84 @@ type ClusterVolumesData struct { FilterCollection string `json:"filter_collection"` } +// ClusterEcShardsData represents the data for the cluster EC shards page +type ClusterEcShardsData struct { + Username string `json:"username"` + EcShards []EcShardWithInfo `json:"ec_shards"` + TotalShards int `json:"total_shards"` + TotalVolumes int `json:"total_volumes"` + LastUpdated time.Time `json:"last_updated"` + + // Pagination + CurrentPage int `json:"current_page"` + TotalPages int `json:"total_pages"` + PageSize int `json:"page_size"` + + // Sorting + SortBy string `json:"sort_by"` + SortOrder string `json:"sort_order"` + + // Statistics + DataCenterCount int `json:"datacenter_count"` + RackCount int `json:"rack_count"` + CollectionCount int `json:"collection_count"` + + // Conditional display flags + ShowDataCenterColumn bool `json:"show_datacenter_column"` + ShowRackColumn bool `json:"show_rack_column"` + ShowCollectionColumn bool `json:"show_collection_column"` + + // Single values when only one exists + SingleDataCenter string `json:"single_datacenter"` + SingleRack string `json:"single_rack"` + SingleCollection string `json:"single_collection"` + + // Filtering + FilterCollection string `json:"filter_collection"` + + // EC specific statistics + ShardsPerVolume map[uint32]int `json:"shards_per_volume"` // VolumeID -> shard count + VolumesWithAllShards int `json:"volumes_with_all_shards"` // Volumes with all 14 shards + VolumesWithMissingShards int `json:"volumes_with_missing_shards"` // Volumes missing shards +} + +// EcShardWithInfo represents an EC shard with its topology information +type EcShardWithInfo struct { + VolumeID uint32 `json:"volume_id"` + ShardID uint32 `json:"shard_id"` + Collection string `json:"collection"` + Size uint64 `json:"size"` + Server string `json:"server"` + DataCenter string `json:"datacenter"` + Rack string `json:"rack"` + DiskType string `json:"disk_type"` + ModifiedTime int64 `json:"modified_time"` + + // EC specific fields + EcIndexBits uint32 `json:"ec_index_bits"` // Bitmap of which shards this server has + ShardCount int `json:"shard_count"` // Number of shards this server has for this volume + IsComplete bool `json:"is_complete"` // True if this volume has all 14 shards + MissingShards []int `json:"missing_shards"` // List of missing shard IDs +} + +// EcVolumeDetailsData represents the data for the EC volume details page +type EcVolumeDetailsData struct { + Username string `json:"username"` + VolumeID uint32 `json:"volume_id"` + Collection string `json:"collection"` + Shards []EcShardWithInfo `json:"shards"` + TotalShards int `json:"total_shards"` + IsComplete bool `json:"is_complete"` + MissingShards []int `json:"missing_shards"` + DataCenters []string `json:"datacenters"` + Servers []string `json:"servers"` + LastUpdated time.Time `json:"last_updated"` + + // Sorting + SortBy string `json:"sort_by"` + SortOrder string `json:"sort_order"` +} + type VolumeDetailsData struct { Volume VolumeWithTopology `json:"volume"` Replicas []VolumeWithTopology `json:"replicas"` @@ -145,12 +223,13 @@ type VolumeDetailsData struct { // Collection management structures type CollectionInfo struct { - Name string `json:"name"` - DataCenter string `json:"datacenter"` - VolumeCount int `json:"volume_count"` - FileCount int64 `json:"file_count"` - TotalSize int64 `json:"total_size"` - DiskTypes []string `json:"disk_types"` + Name string `json:"name"` + DataCenter string `json:"datacenter"` + VolumeCount int `json:"volume_count"` + EcVolumeCount int `json:"ec_volume_count"` + FileCount int64 `json:"file_count"` + TotalSize int64 `json:"total_size"` + DiskTypes []string `json:"disk_types"` } type ClusterCollectionsData struct { @@ -158,6 +237,7 @@ type ClusterCollectionsData struct { Collections []CollectionInfo `json:"collections"` TotalCollections int `json:"total_collections"` TotalVolumes int `json:"total_volumes"` + TotalEcVolumes int `json:"total_ec_volumes"` TotalFiles int64 `json:"total_files"` TotalSize int64 `json:"total_size"` LastUpdated time.Time `json:"last_updated"` @@ -376,3 +456,74 @@ type MaintenanceWorkersData struct { } // Maintenance system types are now in weed/admin/maintenance package + +// EcVolumeWithShards represents an EC volume with its shard distribution +type EcVolumeWithShards struct { + VolumeID uint32 `json:"volume_id"` + Collection string `json:"collection"` + TotalShards int `json:"total_shards"` + IsComplete bool `json:"is_complete"` + MissingShards []int `json:"missing_shards"` + ShardLocations map[int]string `json:"shard_locations"` // shardId -> server + ShardSizes map[int]int64 `json:"shard_sizes"` // shardId -> size in bytes + DataCenters []string `json:"data_centers"` + Servers []string `json:"servers"` + Racks []string `json:"racks"` + ModifiedTime int64 `json:"modified_time"` +} + +// ClusterEcVolumesData represents the response for clustered EC volumes view +type ClusterEcVolumesData struct { + EcVolumes []EcVolumeWithShards `json:"ec_volumes"` + TotalVolumes int `json:"total_volumes"` + LastUpdated time.Time `json:"last_updated"` + + // Pagination + Page int `json:"page"` + PageSize int `json:"page_size"` + TotalPages int `json:"total_pages"` + + // Sorting + SortBy string `json:"sort_by"` + SortOrder string `json:"sort_order"` + + // Filtering + Collection string `json:"collection"` + + // Conditional display flags + ShowDataCenterColumn bool `json:"show_datacenter_column"` + ShowRackColumn bool `json:"show_rack_column"` + ShowCollectionColumn bool `json:"show_collection_column"` + + // Statistics + CompleteVolumes int `json:"complete_volumes"` + IncompleteVolumes int `json:"incomplete_volumes"` + TotalShards int `json:"total_shards"` + + // User context + Username string `json:"username"` +} + +// Collection detail page structures +type CollectionDetailsData struct { + Username string `json:"username"` + CollectionName string `json:"collection_name"` + RegularVolumes []VolumeWithTopology `json:"regular_volumes"` + EcVolumes []EcVolumeWithShards `json:"ec_volumes"` + TotalVolumes int `json:"total_volumes"` + TotalEcVolumes int `json:"total_ec_volumes"` + TotalFiles int64 `json:"total_files"` + TotalSize int64 `json:"total_size"` + DataCenters []string `json:"data_centers"` + DiskTypes []string `json:"disk_types"` + LastUpdated time.Time `json:"last_updated"` + + // Pagination + Page int `json:"page"` + PageSize int `json:"page_size"` + TotalPages int `json:"total_pages"` + + // Sorting + SortBy string `json:"sort_by"` + SortOrder string `json:"sort_order"` +} diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 36f97261a..3b4312235 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -319,27 +319,41 @@ func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *wo // handleTaskRequest processes task requests from workers func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) { + // glog.Infof("DEBUG handleTaskRequest: Worker %s requesting tasks with capabilities %v", conn.workerID, conn.capabilities) + if s.adminServer.maintenanceManager == nil { + glog.Infof("DEBUG handleTaskRequest: maintenance manager is nil") return } // Get next task from maintenance manager task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities) + // glog.Infof("DEBUG handleTaskRequest: GetNextTask returned task: %v", task != nil) if task != nil { + glog.Infof("DEBUG handleTaskRequest: Assigning task %s (type: %s) to worker %s", task.ID, task.Type, conn.workerID) + + // Use typed params directly - master client should already be configured in the params + var taskParams *worker_pb.TaskParams + if task.TypedParams != nil { + taskParams = task.TypedParams + } else { + // Create basic params if none exist + taskParams = &worker_pb.TaskParams{ + VolumeId: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + } + } + // Send task assignment assignment := &worker_pb.AdminMessage{ Timestamp: time.Now().Unix(), Message: &worker_pb.AdminMessage_TaskAssignment{ TaskAssignment: &worker_pb.TaskAssignment{ - TaskId: task.ID, - TaskType: string(task.Type), - Params: &worker_pb.TaskParams{ - VolumeId: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - Parameters: convertTaskParameters(task.Parameters), - }, + TaskId: task.ID, + TaskType: string(task.Type), + Params: taskParams, Priority: int32(task.Priority), CreatedTime: time.Now().Unix(), }, @@ -348,10 +362,12 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo select { case conn.outgoing <- assignment: - glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID) + glog.Infof("DEBUG handleTaskRequest: Successfully assigned task %s to worker %s", task.ID, conn.workerID) case <-time.After(time.Second): glog.Warningf("Failed to send task assignment to worker %s", conn.workerID) } + } else { + // glog.Infof("DEBUG handleTaskRequest: No tasks available for worker %s", conn.workerID) } } |
