aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/dash
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/dash')
-rw-r--r--weed/admin/dash/admin_server.go321
-rw-r--r--weed/admin/dash/collection_management.go268
-rw-r--r--weed/admin/dash/config_persistence.go632
-rw-r--r--weed/admin/dash/ec_shard_management.go734
-rw-r--r--weed/admin/dash/middleware.go23
-rw-r--r--weed/admin/dash/types.go163
-rw-r--r--weed/admin/dash/worker_grpc_server.go34
7 files changed, 2006 insertions, 169 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go
index 6ebade19f..376f3edc7 100644
--- a/weed/admin/dash/admin_server.go
+++ b/weed/admin/dash/admin_server.go
@@ -25,6 +25,7 @@ import (
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/s3api"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
type AdminServer struct {
@@ -126,30 +127,67 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string)
}
}
- // Initialize maintenance system with persistent configuration
+ // Initialize maintenance system - always initialize even without persistent storage
+ var maintenanceConfig *maintenance.MaintenanceConfig
if server.configPersistence.IsConfigured() {
- maintenanceConfig, err := server.configPersistence.LoadMaintenanceConfig()
+ var err error
+ maintenanceConfig, err = server.configPersistence.LoadMaintenanceConfig()
if err != nil {
glog.Errorf("Failed to load maintenance configuration: %v", err)
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
}
- server.InitMaintenanceManager(maintenanceConfig)
- // Start maintenance manager if enabled
- if maintenanceConfig.Enabled {
- go func() {
- if err := server.StartMaintenanceManager(); err != nil {
- glog.Errorf("Failed to start maintenance manager: %v", err)
- }
- }()
+ // Apply new defaults to handle schema changes (like enabling by default)
+ schema := maintenance.GetMaintenanceConfigSchema()
+ if err := schema.ApplyDefaultsToProtobuf(maintenanceConfig); err != nil {
+ glog.Warningf("Failed to apply schema defaults to loaded config: %v", err)
+ }
+
+ // Force enable maintenance system for new default behavior
+ // This handles the case where old configs had Enabled=false as default
+ if !maintenanceConfig.Enabled {
+ glog.V(1).Infof("Enabling maintenance system (new default behavior)")
+ maintenanceConfig.Enabled = true
}
+
+ glog.V(1).Infof("Maintenance system initialized with persistent configuration (enabled: %v)", maintenanceConfig.Enabled)
} else {
- glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode")
+ maintenanceConfig = maintenance.DefaultMaintenanceConfig()
+ glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode (enabled: %v)", maintenanceConfig.Enabled)
+ }
+
+ // Always initialize maintenance manager
+ server.InitMaintenanceManager(maintenanceConfig)
+
+ // Load saved task configurations from persistence
+ server.loadTaskConfigurationsFromPersistence()
+
+ // Start maintenance manager if enabled
+ if maintenanceConfig.Enabled {
+ go func() {
+ // Give master client a bit of time to connect before starting scans
+ time.Sleep(2 * time.Second)
+ if err := server.StartMaintenanceManager(); err != nil {
+ glog.Errorf("Failed to start maintenance manager: %v", err)
+ }
+ }()
}
return server
}
+// loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files
+func (s *AdminServer) loadTaskConfigurationsFromPersistence() {
+ if s.configPersistence == nil || !s.configPersistence.IsConfigured() {
+ glog.V(1).Infof("Config persistence not available, using default task configurations")
+ return
+ }
+
+ // Load task configurations dynamically using the config update registry
+ configUpdateRegistry := tasks.GetGlobalConfigUpdateRegistry()
+ configUpdateRegistry.UpdateAllConfigs(s.configPersistence)
+}
+
// GetCredentialManager returns the credential manager
func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
return s.credentialManager
@@ -852,6 +890,15 @@ func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"success": true, "message": "Task cancelled"})
}
+// cancelMaintenanceTask cancels a pending maintenance task
+func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
+ if as.maintenanceManager == nil {
+ return fmt.Errorf("maintenance manager not initialized")
+ }
+
+ return as.maintenanceManager.CancelTask(taskID)
+}
+
// GetMaintenanceWorkersAPI returns all maintenance workers
func (as *AdminServer) GetMaintenanceWorkersAPI(c *gin.Context) {
workers, err := as.getMaintenanceWorkers()
@@ -899,13 +946,21 @@ func (as *AdminServer) GetMaintenanceConfigAPI(c *gin.Context) {
// UpdateMaintenanceConfigAPI updates maintenance configuration via API
func (as *AdminServer) UpdateMaintenanceConfigAPI(c *gin.Context) {
- var config MaintenanceConfig
- if err := c.ShouldBindJSON(&config); err != nil {
+ // Parse JSON into a generic map first to handle type conversions
+ var jsonConfig map[string]interface{}
+ if err := c.ShouldBindJSON(&jsonConfig); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
- err := as.updateMaintenanceConfig(&config)
+ // Convert JSON map to protobuf configuration
+ config, err := convertJSONToMaintenanceConfig(jsonConfig)
+ if err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse configuration: " + err.Error()})
+ return
+ }
+
+ err = as.updateMaintenanceConfig(config)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
@@ -951,17 +1006,36 @@ func (as *AdminServer) getMaintenanceQueueData() (*maintenance.MaintenanceQueueD
}, nil
}
+// GetMaintenanceQueueStats returns statistics for the maintenance queue (exported for handlers)
+func (as *AdminServer) GetMaintenanceQueueStats() (*maintenance.QueueStats, error) {
+ return as.getMaintenanceQueueStats()
+}
+
// getMaintenanceQueueStats returns statistics for the maintenance queue
func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, error) {
- // This would integrate with the maintenance queue to get real statistics
- // For now, return mock data
- return &maintenance.QueueStats{
- PendingTasks: 5,
- RunningTasks: 2,
- CompletedToday: 15,
- FailedToday: 1,
- TotalTasks: 23,
- }, nil
+ if as.maintenanceManager == nil {
+ return &maintenance.QueueStats{
+ PendingTasks: 0,
+ RunningTasks: 0,
+ CompletedToday: 0,
+ FailedToday: 0,
+ TotalTasks: 0,
+ }, nil
+ }
+
+ // Get real statistics from maintenance manager
+ stats := as.maintenanceManager.GetStats()
+
+ // Convert MaintenanceStats to QueueStats
+ queueStats := &maintenance.QueueStats{
+ PendingTasks: stats.TasksByStatus[maintenance.TaskStatusPending],
+ RunningTasks: stats.TasksByStatus[maintenance.TaskStatusAssigned] + stats.TasksByStatus[maintenance.TaskStatusInProgress],
+ CompletedToday: stats.CompletedToday,
+ FailedToday: stats.FailedToday,
+ TotalTasks: stats.TotalTasks,
+ }
+
+ return queueStats, nil
}
// getMaintenanceTasks returns all maintenance tasks
@@ -1000,15 +1074,6 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro
return nil, fmt.Errorf("task %s not found", taskID)
}
-// cancelMaintenanceTask cancels a pending maintenance task
-func (as *AdminServer) cancelMaintenanceTask(taskID string) error {
- if as.maintenanceManager == nil {
- return fmt.Errorf("maintenance manager not initialized")
- }
-
- return as.maintenanceManager.CancelTask(taskID)
-}
-
// getMaintenanceWorkers returns all maintenance workers
func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
if as.maintenanceManager == nil {
@@ -1110,11 +1175,14 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat
// Load configuration from persistent storage
config, err := as.configPersistence.LoadMaintenanceConfig()
if err != nil {
- glog.Errorf("Failed to load maintenance configuration: %v", err)
// Fallback to default configuration
- config = DefaultMaintenanceConfig()
+ config = maintenance.DefaultMaintenanceConfig()
}
+ // Note: Do NOT apply schema defaults to existing config as it overrides saved values
+ // Only apply defaults when creating new configs or handling fallback cases
+ // The schema defaults should only be used in the UI for new installations
+
// Get system stats from maintenance manager if available
var systemStats *MaintenanceStats
if as.maintenanceManager != nil {
@@ -1139,18 +1207,25 @@ func (as *AdminServer) getMaintenanceConfig() (*maintenance.MaintenanceConfigDat
}
}
- return &MaintenanceConfigData{
+ configData := &MaintenanceConfigData{
Config: config,
IsEnabled: config.Enabled,
LastScanTime: systemStats.LastScanTime,
NextScanTime: systemStats.NextScanTime,
SystemStats: systemStats,
MenuItems: maintenance.BuildMaintenanceMenuItems(),
- }, nil
+ }
+
+ return configData, nil
}
// updateMaintenanceConfig updates maintenance configuration
func (as *AdminServer) updateMaintenanceConfig(config *maintenance.MaintenanceConfig) error {
+ // Use ConfigField validation instead of standalone validation
+ if err := maintenance.ValidateMaintenanceConfigWithSchema(config); err != nil {
+ return fmt.Errorf("configuration validation failed: %v", err)
+ }
+
// Save configuration to persistent storage
if err := as.configPersistence.SaveMaintenanceConfig(config); err != nil {
return fmt.Errorf("failed to save maintenance configuration: %w", err)
@@ -1175,7 +1250,14 @@ func (as *AdminServer) triggerMaintenanceScan() error {
return fmt.Errorf("maintenance manager not initialized")
}
- return as.maintenanceManager.TriggerScan()
+ glog.V(1).Infof("Triggering maintenance scan")
+ err := as.maintenanceManager.TriggerScan()
+ if err != nil {
+ glog.Errorf("Failed to trigger maintenance scan: %v", err)
+ return err
+ }
+ glog.V(1).Infof("Maintenance scan triggered successfully")
+ return nil
}
// TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API
@@ -1265,14 +1347,11 @@ func (as *AdminServer) GetMaintenanceWorkersData() (*MaintenanceWorkersData, err
}
// StartWorkerGrpcServer starts the worker gRPC server
-func (s *AdminServer) StartWorkerGrpcServer(httpPort int) error {
+func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error {
if s.workerGrpcServer != nil {
return fmt.Errorf("worker gRPC server is already running")
}
- // Calculate gRPC port (HTTP port + 10000)
- grpcPort := httpPort + 10000
-
s.workerGrpcServer = NewWorkerGrpcServer(s)
return s.workerGrpcServer.StartWithTLS(grpcPort)
}
@@ -1412,7 +1491,7 @@ func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool,
}
// Create gRPC connection
- conn, err := grpc.Dial(brokerAddress, s.grpcDialOption)
+ conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption)
if err != nil {
return fmt.Errorf("failed to connect to broker: %w", err)
}
@@ -1501,3 +1580,161 @@ func extractVersioningFromEntry(entry *filer_pb.Entry) bool {
enabled, _ := s3api.LoadVersioningFromExtended(entry)
return enabled
}
+
+// GetConfigPersistence returns the config persistence manager
+func (as *AdminServer) GetConfigPersistence() *ConfigPersistence {
+ return as.configPersistence
+}
+
+// convertJSONToMaintenanceConfig converts JSON map to protobuf MaintenanceConfig
+func convertJSONToMaintenanceConfig(jsonConfig map[string]interface{}) (*maintenance.MaintenanceConfig, error) {
+ config := &maintenance.MaintenanceConfig{}
+
+ // Helper function to get int32 from interface{}
+ getInt32 := func(key string) (int32, error) {
+ if val, ok := jsonConfig[key]; ok {
+ switch v := val.(type) {
+ case int:
+ return int32(v), nil
+ case int32:
+ return v, nil
+ case int64:
+ return int32(v), nil
+ case float64:
+ return int32(v), nil
+ default:
+ return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
+ }
+ }
+ return 0, nil
+ }
+
+ // Helper function to get bool from interface{}
+ getBool := func(key string) bool {
+ if val, ok := jsonConfig[key]; ok {
+ if b, ok := val.(bool); ok {
+ return b
+ }
+ }
+ return false
+ }
+
+ var err error
+
+ // Convert basic fields
+ config.Enabled = getBool("enabled")
+
+ if config.ScanIntervalSeconds, err = getInt32("scan_interval_seconds"); err != nil {
+ return nil, err
+ }
+ if config.WorkerTimeoutSeconds, err = getInt32("worker_timeout_seconds"); err != nil {
+ return nil, err
+ }
+ if config.TaskTimeoutSeconds, err = getInt32("task_timeout_seconds"); err != nil {
+ return nil, err
+ }
+ if config.RetryDelaySeconds, err = getInt32("retry_delay_seconds"); err != nil {
+ return nil, err
+ }
+ if config.MaxRetries, err = getInt32("max_retries"); err != nil {
+ return nil, err
+ }
+ if config.CleanupIntervalSeconds, err = getInt32("cleanup_interval_seconds"); err != nil {
+ return nil, err
+ }
+ if config.TaskRetentionSeconds, err = getInt32("task_retention_seconds"); err != nil {
+ return nil, err
+ }
+
+ // Convert policy if present
+ if policyData, ok := jsonConfig["policy"]; ok {
+ if policyMap, ok := policyData.(map[string]interface{}); ok {
+ policy := &maintenance.MaintenancePolicy{}
+
+ if globalMaxConcurrent, err := getInt32FromMap(policyMap, "global_max_concurrent"); err != nil {
+ return nil, err
+ } else {
+ policy.GlobalMaxConcurrent = globalMaxConcurrent
+ }
+
+ if defaultRepeatIntervalSeconds, err := getInt32FromMap(policyMap, "default_repeat_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ policy.DefaultRepeatIntervalSeconds = defaultRepeatIntervalSeconds
+ }
+
+ if defaultCheckIntervalSeconds, err := getInt32FromMap(policyMap, "default_check_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ policy.DefaultCheckIntervalSeconds = defaultCheckIntervalSeconds
+ }
+
+ // Convert task policies if present
+ if taskPoliciesData, ok := policyMap["task_policies"]; ok {
+ if taskPoliciesMap, ok := taskPoliciesData.(map[string]interface{}); ok {
+ policy.TaskPolicies = make(map[string]*maintenance.TaskPolicy)
+
+ for taskType, taskPolicyData := range taskPoliciesMap {
+ if taskPolicyMap, ok := taskPolicyData.(map[string]interface{}); ok {
+ taskPolicy := &maintenance.TaskPolicy{}
+
+ taskPolicy.Enabled = getBoolFromMap(taskPolicyMap, "enabled")
+
+ if maxConcurrent, err := getInt32FromMap(taskPolicyMap, "max_concurrent"); err != nil {
+ return nil, err
+ } else {
+ taskPolicy.MaxConcurrent = maxConcurrent
+ }
+
+ if repeatIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "repeat_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ taskPolicy.RepeatIntervalSeconds = repeatIntervalSeconds
+ }
+
+ if checkIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "check_interval_seconds"); err != nil {
+ return nil, err
+ } else {
+ taskPolicy.CheckIntervalSeconds = checkIntervalSeconds
+ }
+
+ policy.TaskPolicies[taskType] = taskPolicy
+ }
+ }
+ }
+ }
+
+ config.Policy = policy
+ }
+ }
+
+ return config, nil
+}
+
+// Helper functions for map conversion
+func getInt32FromMap(m map[string]interface{}, key string) (int32, error) {
+ if val, ok := m[key]; ok {
+ switch v := val.(type) {
+ case int:
+ return int32(v), nil
+ case int32:
+ return v, nil
+ case int64:
+ return int32(v), nil
+ case float64:
+ return int32(v), nil
+ default:
+ return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
+ }
+ }
+ return 0, nil
+}
+
+func getBoolFromMap(m map[string]interface{}, key string) bool {
+ if val, ok := m[key]; ok {
+ if b, ok := val.(bool); ok {
+ return b
+ }
+ }
+ return false
+}
diff --git a/weed/admin/dash/collection_management.go b/weed/admin/dash/collection_management.go
index a70c82918..03c1e452b 100644
--- a/weed/admin/dash/collection_management.go
+++ b/weed/admin/dash/collection_management.go
@@ -12,6 +12,7 @@ import (
func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) {
var collections []CollectionInfo
var totalVolumes int
+ var totalEcVolumes int
var totalFiles int64
var totalSize int64
collectionMap := make(map[string]*CollectionInfo)
@@ -28,6 +29,7 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
+ // Process regular volumes
for _, volInfo := range diskInfo.VolumeInfos {
// Extract collection name from volume info
collectionName := volInfo.Collection
@@ -69,12 +71,13 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) {
totalSize += int64(volInfo.Size)
} else {
newCollection := CollectionInfo{
- Name: collectionName,
- DataCenter: dc.Id,
- VolumeCount: 1,
- FileCount: int64(volInfo.FileCount),
- TotalSize: int64(volInfo.Size),
- DiskTypes: []string{diskType},
+ Name: collectionName,
+ DataCenter: dc.Id,
+ VolumeCount: 1,
+ EcVolumeCount: 0,
+ FileCount: int64(volInfo.FileCount),
+ TotalSize: int64(volInfo.Size),
+ DiskTypes: []string{diskType},
}
collectionMap[collectionName] = &newCollection
totalVolumes++
@@ -82,6 +85,63 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) {
totalSize += int64(volInfo.Size)
}
}
+
+ // Process EC volumes
+ ecVolumeMap := make(map[uint32]bool) // Track unique EC volumes to avoid double counting
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ // Extract collection name from EC shard info
+ collectionName := ecShardInfo.Collection
+ if collectionName == "" {
+ collectionName = "default" // Default collection for EC volumes without explicit collection
+ }
+
+ // Only count each EC volume once (not per shard)
+ if !ecVolumeMap[ecShardInfo.Id] {
+ ecVolumeMap[ecShardInfo.Id] = true
+
+ // Get disk type from disk info, default to hdd if empty
+ diskType := diskInfo.Type
+ if diskType == "" {
+ diskType = "hdd"
+ }
+
+ // Get or create collection info
+ if collection, exists := collectionMap[collectionName]; exists {
+ collection.EcVolumeCount++
+
+ // Update data center if this collection spans multiple DCs
+ if collection.DataCenter != dc.Id && collection.DataCenter != "multi" {
+ collection.DataCenter = "multi"
+ }
+
+ // Add disk type if not already present
+ diskTypeExists := false
+ for _, existingDiskType := range collection.DiskTypes {
+ if existingDiskType == diskType {
+ diskTypeExists = true
+ break
+ }
+ }
+ if !diskTypeExists {
+ collection.DiskTypes = append(collection.DiskTypes, diskType)
+ }
+
+ totalEcVolumes++
+ } else {
+ newCollection := CollectionInfo{
+ Name: collectionName,
+ DataCenter: dc.Id,
+ VolumeCount: 0,
+ EcVolumeCount: 1,
+ FileCount: 0,
+ TotalSize: 0,
+ DiskTypes: []string{diskType},
+ }
+ collectionMap[collectionName] = &newCollection
+ totalEcVolumes++
+ }
+ }
+ }
}
}
}
@@ -112,6 +172,7 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) {
Collections: []CollectionInfo{},
TotalCollections: 0,
TotalVolumes: 0,
+ TotalEcVolumes: 0,
TotalFiles: 0,
TotalSize: 0,
LastUpdated: time.Now(),
@@ -122,8 +183,203 @@ func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) {
Collections: collections,
TotalCollections: len(collections),
TotalVolumes: totalVolumes,
+ TotalEcVolumes: totalEcVolumes,
TotalFiles: totalFiles,
TotalSize: totalSize,
LastUpdated: time.Now(),
}, nil
}
+
+// GetCollectionDetails retrieves detailed information for a specific collection including volumes and EC volumes
+func (s *AdminServer) GetCollectionDetails(collectionName string, page int, pageSize int, sortBy string, sortOrder string) (*CollectionDetailsData, error) {
+ // Set defaults
+ if page < 1 {
+ page = 1
+ }
+ if pageSize < 1 || pageSize > 1000 {
+ pageSize = 25
+ }
+ if sortBy == "" {
+ sortBy = "volume_id"
+ }
+ if sortOrder == "" {
+ sortOrder = "asc"
+ }
+
+ var regularVolumes []VolumeWithTopology
+ var ecVolumes []EcVolumeWithShards
+ var totalFiles int64
+ var totalSize int64
+ dataCenters := make(map[string]bool)
+ diskTypes := make(map[string]bool)
+
+ // Get regular volumes for this collection
+ regularVolumeData, err := s.GetClusterVolumes(1, 10000, "volume_id", "asc", collectionName) // Get all volumes
+ if err != nil {
+ return nil, err
+ }
+
+ regularVolumes = regularVolumeData.Volumes
+ totalSize = regularVolumeData.TotalSize
+
+ // Calculate total files from regular volumes
+ for _, vol := range regularVolumes {
+ totalFiles += int64(vol.FileCount)
+ }
+
+ // Collect data centers and disk types from regular volumes
+ for _, vol := range regularVolumes {
+ dataCenters[vol.DataCenter] = true
+ diskTypes[vol.DiskType] = true
+ }
+
+ // Get EC volumes for this collection
+ ecVolumeData, err := s.GetClusterEcVolumes(1, 10000, "volume_id", "asc", collectionName) // Get all EC volumes
+ if err != nil {
+ return nil, err
+ }
+
+ ecVolumes = ecVolumeData.EcVolumes
+
+ // Collect data centers from EC volumes
+ for _, ecVol := range ecVolumes {
+ for _, dc := range ecVol.DataCenters {
+ dataCenters[dc] = true
+ }
+ }
+
+ // Combine all volumes for sorting and pagination
+ type VolumeForSorting struct {
+ Type string // "regular" or "ec"
+ RegularVolume *VolumeWithTopology
+ EcVolume *EcVolumeWithShards
+ }
+
+ var allVolumes []VolumeForSorting
+ for i := range regularVolumes {
+ allVolumes = append(allVolumes, VolumeForSorting{
+ Type: "regular",
+ RegularVolume: &regularVolumes[i],
+ })
+ }
+ for i := range ecVolumes {
+ allVolumes = append(allVolumes, VolumeForSorting{
+ Type: "ec",
+ EcVolume: &ecVolumes[i],
+ })
+ }
+
+ // Sort all volumes
+ sort.Slice(allVolumes, func(i, j int) bool {
+ var less bool
+ switch sortBy {
+ case "volume_id":
+ var idI, idJ uint32
+ if allVolumes[i].Type == "regular" {
+ idI = allVolumes[i].RegularVolume.Id
+ } else {
+ idI = allVolumes[i].EcVolume.VolumeID
+ }
+ if allVolumes[j].Type == "regular" {
+ idJ = allVolumes[j].RegularVolume.Id
+ } else {
+ idJ = allVolumes[j].EcVolume.VolumeID
+ }
+ less = idI < idJ
+ case "type":
+ // Sort by type first (regular before ec), then by volume ID
+ if allVolumes[i].Type == allVolumes[j].Type {
+ var idI, idJ uint32
+ if allVolumes[i].Type == "regular" {
+ idI = allVolumes[i].RegularVolume.Id
+ } else {
+ idI = allVolumes[i].EcVolume.VolumeID
+ }
+ if allVolumes[j].Type == "regular" {
+ idJ = allVolumes[j].RegularVolume.Id
+ } else {
+ idJ = allVolumes[j].EcVolume.VolumeID
+ }
+ less = idI < idJ
+ } else {
+ less = allVolumes[i].Type < allVolumes[j].Type // "ec" < "regular"
+ }
+ default:
+ // Default to volume ID sort
+ var idI, idJ uint32
+ if allVolumes[i].Type == "regular" {
+ idI = allVolumes[i].RegularVolume.Id
+ } else {
+ idI = allVolumes[i].EcVolume.VolumeID
+ }
+ if allVolumes[j].Type == "regular" {
+ idJ = allVolumes[j].RegularVolume.Id
+ } else {
+ idJ = allVolumes[j].EcVolume.VolumeID
+ }
+ less = idI < idJ
+ }
+
+ if sortOrder == "desc" {
+ return !less
+ }
+ return less
+ })
+
+ // Apply pagination
+ totalVolumesAndEc := len(allVolumes)
+ totalPages := (totalVolumesAndEc + pageSize - 1) / pageSize
+ startIndex := (page - 1) * pageSize
+ endIndex := startIndex + pageSize
+ if endIndex > totalVolumesAndEc {
+ endIndex = totalVolumesAndEc
+ }
+
+ if startIndex >= totalVolumesAndEc {
+ startIndex = 0
+ endIndex = 0
+ }
+
+ // Extract paginated results
+ var paginatedRegularVolumes []VolumeWithTopology
+ var paginatedEcVolumes []EcVolumeWithShards
+
+ for i := startIndex; i < endIndex; i++ {
+ if allVolumes[i].Type == "regular" {
+ paginatedRegularVolumes = append(paginatedRegularVolumes, *allVolumes[i].RegularVolume)
+ } else {
+ paginatedEcVolumes = append(paginatedEcVolumes, *allVolumes[i].EcVolume)
+ }
+ }
+
+ // Convert maps to slices
+ var dcList []string
+ for dc := range dataCenters {
+ dcList = append(dcList, dc)
+ }
+ sort.Strings(dcList)
+
+ var diskTypeList []string
+ for diskType := range diskTypes {
+ diskTypeList = append(diskTypeList, diskType)
+ }
+ sort.Strings(diskTypeList)
+
+ return &CollectionDetailsData{
+ CollectionName: collectionName,
+ RegularVolumes: paginatedRegularVolumes,
+ EcVolumes: paginatedEcVolumes,
+ TotalVolumes: len(regularVolumes),
+ TotalEcVolumes: len(ecVolumes),
+ TotalFiles: totalFiles,
+ TotalSize: totalSize,
+ DataCenters: dcList,
+ DiskTypes: diskTypeList,
+ LastUpdated: time.Now(),
+ Page: page,
+ PageSize: pageSize,
+ TotalPages: totalPages,
+ SortBy: sortBy,
+ SortOrder: sortOrder,
+ }, nil
+}
diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go
index a2f74f4e7..b6b3074ab 100644
--- a/weed/admin/dash/config_persistence.go
+++ b/weed/admin/dash/config_persistence.go
@@ -1,23 +1,50 @@
package dash
import (
- "encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/proto"
)
const (
- // Configuration file names
- MaintenanceConfigFile = "maintenance.json"
- AdminConfigFile = "admin.json"
+ // Configuration subdirectory
+ ConfigSubdir = "conf"
+
+ // Configuration file names (protobuf binary)
+ MaintenanceConfigFile = "maintenance.pb"
+ VacuumTaskConfigFile = "task_vacuum.pb"
+ ECTaskConfigFile = "task_erasure_coding.pb"
+ BalanceTaskConfigFile = "task_balance.pb"
+ ReplicationTaskConfigFile = "task_replication.pb"
+
+ // JSON reference files
+ MaintenanceConfigJSONFile = "maintenance.json"
+ VacuumTaskConfigJSONFile = "task_vacuum.json"
+ ECTaskConfigJSONFile = "task_erasure_coding.json"
+ BalanceTaskConfigJSONFile = "task_balance.json"
+ ReplicationTaskConfigJSONFile = "task_replication.json"
+
ConfigDirPermissions = 0755
ConfigFilePermissions = 0644
)
+// Task configuration types
+type (
+ VacuumTaskConfig = worker_pb.VacuumTaskConfig
+ ErasureCodingTaskConfig = worker_pb.ErasureCodingTaskConfig
+ BalanceTaskConfig = worker_pb.BalanceTaskConfig
+ ReplicationTaskConfig = worker_pb.ReplicationTaskConfig
+)
+
// ConfigPersistence handles saving and loading configuration files
type ConfigPersistence struct {
dataDir string
@@ -30,122 +57,67 @@ func NewConfigPersistence(dataDir string) *ConfigPersistence {
}
}
-// SaveMaintenanceConfig saves maintenance configuration to JSON file
+// SaveMaintenanceConfig saves maintenance configuration to protobuf file and JSON reference
func (cp *ConfigPersistence) SaveMaintenanceConfig(config *MaintenanceConfig) error {
if cp.dataDir == "" {
return fmt.Errorf("no data directory specified, cannot save configuration")
}
- configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile)
-
- // Create directory if it doesn't exist
- if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil {
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
- // Marshal configuration to JSON
- configData, err := json.MarshalIndent(config, "", " ")
- if err != nil {
- return fmt.Errorf("failed to marshal maintenance config: %w", err)
- }
-
- // Write to file
- if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
- return fmt.Errorf("failed to write maintenance config file: %w", err)
- }
-
- glog.V(1).Infof("Saved maintenance configuration to %s", configPath)
- return nil
-}
-
-// LoadMaintenanceConfig loads maintenance configuration from JSON file
-func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) {
- if cp.dataDir == "" {
- glog.V(1).Infof("No data directory specified, using default maintenance configuration")
- return DefaultMaintenanceConfig(), nil
- }
-
- configPath := filepath.Join(cp.dataDir, MaintenanceConfigFile)
-
- // Check if file exists
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- glog.V(1).Infof("Maintenance config file does not exist, using defaults: %s", configPath)
- return DefaultMaintenanceConfig(), nil
- }
-
- // Read file
- configData, err := os.ReadFile(configPath)
+ // Save as protobuf (primary format)
+ pbConfigPath := filepath.Join(confDir, MaintenanceConfigFile)
+ pbData, err := proto.Marshal(config)
if err != nil {
- return nil, fmt.Errorf("failed to read maintenance config file: %w", err)
+ return fmt.Errorf("failed to marshal maintenance config to protobuf: %w", err)
}
- // Unmarshal JSON
- var config MaintenanceConfig
- if err := json.Unmarshal(configData, &config); err != nil {
- return nil, fmt.Errorf("failed to unmarshal maintenance config: %w", err)
+ if err := os.WriteFile(pbConfigPath, pbData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write protobuf config file: %w", err)
}
- glog.V(1).Infof("Loaded maintenance configuration from %s", configPath)
- return &config, nil
-}
-
-// SaveAdminConfig saves general admin configuration to JSON file
-func (cp *ConfigPersistence) SaveAdminConfig(config map[string]interface{}) error {
- if cp.dataDir == "" {
- return fmt.Errorf("no data directory specified, cannot save configuration")
- }
-
- configPath := filepath.Join(cp.dataDir, AdminConfigFile)
-
- // Create directory if it doesn't exist
- if err := os.MkdirAll(cp.dataDir, ConfigDirPermissions); err != nil {
- return fmt.Errorf("failed to create config directory: %w", err)
- }
-
- // Marshal configuration to JSON
- configData, err := json.MarshalIndent(config, "", " ")
+ // Save JSON reference copy for debugging
+ jsonConfigPath := filepath.Join(confDir, MaintenanceConfigJSONFile)
+ jsonData, err := protojson.MarshalOptions{
+ Multiline: true,
+ Indent: " ",
+ EmitUnpopulated: true,
+ }.Marshal(config)
if err != nil {
- return fmt.Errorf("failed to marshal admin config: %w", err)
+ return fmt.Errorf("failed to marshal maintenance config to JSON: %w", err)
}
- // Write to file
- if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
- return fmt.Errorf("failed to write admin config file: %w", err)
+ if err := os.WriteFile(jsonConfigPath, jsonData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write JSON reference file: %w", err)
}
- glog.V(1).Infof("Saved admin configuration to %s", configPath)
return nil
}
-// LoadAdminConfig loads general admin configuration from JSON file
-func (cp *ConfigPersistence) LoadAdminConfig() (map[string]interface{}, error) {
+// LoadMaintenanceConfig loads maintenance configuration from protobuf file
+func (cp *ConfigPersistence) LoadMaintenanceConfig() (*MaintenanceConfig, error) {
if cp.dataDir == "" {
- glog.V(1).Infof("No data directory specified, using default admin configuration")
- return make(map[string]interface{}), nil
- }
-
- configPath := filepath.Join(cp.dataDir, AdminConfigFile)
-
- // Check if file exists
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- glog.V(1).Infof("Admin config file does not exist, using defaults: %s", configPath)
- return make(map[string]interface{}), nil
+ return DefaultMaintenanceConfig(), nil
}
- // Read file
- configData, err := os.ReadFile(configPath)
- if err != nil {
- return nil, fmt.Errorf("failed to read admin config file: %w", err)
- }
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, MaintenanceConfigFile)
- // Unmarshal JSON
- var config map[string]interface{}
- if err := json.Unmarshal(configData, &config); err != nil {
- return nil, fmt.Errorf("failed to unmarshal admin config: %w", err)
+ // Try to load from protobuf file
+ if configData, err := os.ReadFile(configPath); err == nil {
+ var config MaintenanceConfig
+ if err := proto.Unmarshal(configData, &config); err == nil {
+ // Always populate policy from separate task configuration files
+ config.Policy = buildPolicyFromTaskConfigs()
+ return &config, nil
+ }
}
- glog.V(1).Infof("Loaded admin configuration from %s", configPath)
- return config, nil
+ // File doesn't exist or failed to load, use defaults
+ return DefaultMaintenanceConfig(), nil
}
// GetConfigPath returns the path to a configuration file
@@ -153,24 +125,35 @@ func (cp *ConfigPersistence) GetConfigPath(filename string) string {
if cp.dataDir == "" {
return ""
}
- return filepath.Join(cp.dataDir, filename)
+
+ // All configs go in conf subdirectory
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ return filepath.Join(confDir, filename)
}
-// ListConfigFiles returns all configuration files in the data directory
+// ListConfigFiles returns all configuration files in the conf subdirectory
func (cp *ConfigPersistence) ListConfigFiles() ([]string, error) {
if cp.dataDir == "" {
return nil, fmt.Errorf("no data directory specified")
}
- files, err := os.ReadDir(cp.dataDir)
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ files, err := os.ReadDir(confDir)
if err != nil {
+ // If conf directory doesn't exist, return empty list
+ if os.IsNotExist(err) {
+ return []string{}, nil
+ }
return nil, fmt.Errorf("failed to read config directory: %w", err)
}
var configFiles []string
for _, file := range files {
- if !file.IsDir() && filepath.Ext(file.Name()) == ".json" {
- configFiles = append(configFiles, file.Name())
+ if !file.IsDir() {
+ ext := filepath.Ext(file.Name())
+ if ext == ".json" || ext == ".pb" {
+ configFiles = append(configFiles, file.Name())
+ }
}
}
@@ -183,7 +166,7 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error {
return fmt.Errorf("no data directory specified")
}
- configPath := filepath.Join(cp.dataDir, filename)
+ configPath := cp.GetConfigPath(filename)
if _, err := os.Stat(configPath); os.IsNotExist(err) {
return fmt.Errorf("config file does not exist: %s", filename)
}
@@ -191,7 +174,10 @@ func (cp *ConfigPersistence) BackupConfig(filename string) error {
// Create backup filename with timestamp
timestamp := time.Now().Format("2006-01-02_15-04-05")
backupName := fmt.Sprintf("%s.backup_%s", filename, timestamp)
- backupPath := filepath.Join(cp.dataDir, backupName)
+
+ // Determine backup directory (conf subdirectory)
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ backupPath := filepath.Join(confDir, backupName)
// Copy file
configData, err := os.ReadFile(configPath)
@@ -213,7 +199,10 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
return fmt.Errorf("no data directory specified")
}
- backupPath := filepath.Join(cp.dataDir, backupName)
+ // Determine backup path (conf subdirectory)
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ backupPath := filepath.Join(confDir, backupName)
+
if _, err := os.Stat(backupPath); os.IsNotExist(err) {
return fmt.Errorf("backup file does not exist: %s", backupName)
}
@@ -225,7 +214,7 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
}
// Write to config file
- configPath := filepath.Join(cp.dataDir, filename)
+ configPath := cp.GetConfigPath(filename)
if err := os.WriteFile(configPath, backupData, ConfigFilePermissions); err != nil {
return fmt.Errorf("failed to restore config: %w", err)
}
@@ -234,6 +223,364 @@ func (cp *ConfigPersistence) RestoreConfig(filename, backupName string) error {
return nil
}
+// SaveVacuumTaskConfig saves vacuum task configuration to protobuf file
+func (cp *ConfigPersistence) SaveVacuumTaskConfig(config *VacuumTaskConfig) error {
+ return cp.saveTaskConfig(VacuumTaskConfigFile, config)
+}
+
+// SaveVacuumTaskPolicy saves complete vacuum task policy to protobuf file
+func (cp *ConfigPersistence) SaveVacuumTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return cp.saveTaskConfig(VacuumTaskConfigFile, policy)
+}
+
+// LoadVacuumTaskConfig loads vacuum task configuration from protobuf file
+func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) {
+ // Load as TaskPolicy and extract vacuum config
+ if taskPolicy, err := cp.LoadVacuumTaskPolicy(); err == nil && taskPolicy != nil {
+ if vacuumConfig := taskPolicy.GetVacuumConfig(); vacuumConfig != nil {
+ return vacuumConfig, nil
+ }
+ }
+
+ // Return default config if no valid config found
+ return &VacuumTaskConfig{
+ GarbageThreshold: 0.3,
+ MinVolumeAgeHours: 24,
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }, nil
+}
+
+// LoadVacuumTaskPolicy loads complete vacuum task policy from protobuf file
+func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) {
+ if cp.dataDir == "" {
+ // Return default policy if no data directory
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 2,
+ RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: 0.3,
+ MinVolumeAgeHours: 24,
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ },
+ },
+ }, nil
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, VacuumTaskConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ // Return default policy if file doesn't exist
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 2,
+ RepeatIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: 0.3,
+ MinVolumeAgeHours: 24,
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ },
+ },
+ }, nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read vacuum task config file: %w", err)
+ }
+
+ // Try to unmarshal as TaskPolicy
+ var policy worker_pb.TaskPolicy
+ if err := proto.Unmarshal(configData, &policy); err == nil {
+ // Validate that it's actually a TaskPolicy with vacuum config
+ if policy.GetVacuumConfig() != nil {
+ glog.V(1).Infof("Loaded vacuum task policy from %s", configPath)
+ return &policy, nil
+ }
+ }
+
+ return nil, fmt.Errorf("failed to unmarshal vacuum task configuration")
+}
+
+// SaveErasureCodingTaskConfig saves EC task configuration to protobuf file
+func (cp *ConfigPersistence) SaveErasureCodingTaskConfig(config *ErasureCodingTaskConfig) error {
+ return cp.saveTaskConfig(ECTaskConfigFile, config)
+}
+
+// SaveErasureCodingTaskPolicy saves complete EC task policy to protobuf file
+func (cp *ConfigPersistence) SaveErasureCodingTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return cp.saveTaskConfig(ECTaskConfigFile, policy)
+}
+
+// LoadErasureCodingTaskConfig loads EC task configuration from protobuf file
+func (cp *ConfigPersistence) LoadErasureCodingTaskConfig() (*ErasureCodingTaskConfig, error) {
+ // Load as TaskPolicy and extract EC config
+ if taskPolicy, err := cp.LoadErasureCodingTaskPolicy(); err == nil && taskPolicy != nil {
+ if ecConfig := taskPolicy.GetErasureCodingConfig(); ecConfig != nil {
+ return ecConfig, nil
+ }
+ }
+
+ // Return default config if no valid config found
+ return &ErasureCodingTaskConfig{
+ FullnessRatio: 0.9,
+ QuietForSeconds: 3600,
+ MinVolumeSizeMb: 1024,
+ CollectionFilter: "",
+ }, nil
+}
+
+// LoadErasureCodingTaskPolicy loads complete EC task policy from protobuf file
+func (cp *ConfigPersistence) LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) {
+ if cp.dataDir == "" {
+ // Return default policy if no data directory
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
+ CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: 0.9,
+ QuietForSeconds: 3600,
+ MinVolumeSizeMb: 1024,
+ CollectionFilter: "",
+ },
+ },
+ }, nil
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, ECTaskConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ // Return default policy if file doesn't exist
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 168 * 3600, // 1 week in seconds
+ CheckIntervalSeconds: 24 * 3600, // 24 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: 0.9,
+ QuietForSeconds: 3600,
+ MinVolumeSizeMb: 1024,
+ CollectionFilter: "",
+ },
+ },
+ }, nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read EC task config file: %w", err)
+ }
+
+ // Try to unmarshal as TaskPolicy
+ var policy worker_pb.TaskPolicy
+ if err := proto.Unmarshal(configData, &policy); err == nil {
+ // Validate that it's actually a TaskPolicy with EC config
+ if policy.GetErasureCodingConfig() != nil {
+ glog.V(1).Infof("Loaded EC task policy from %s", configPath)
+ return &policy, nil
+ }
+ }
+
+ return nil, fmt.Errorf("failed to unmarshal EC task configuration")
+}
+
+// SaveBalanceTaskConfig saves balance task configuration to protobuf file
+func (cp *ConfigPersistence) SaveBalanceTaskConfig(config *BalanceTaskConfig) error {
+ return cp.saveTaskConfig(BalanceTaskConfigFile, config)
+}
+
+// SaveBalanceTaskPolicy saves complete balance task policy to protobuf file
+func (cp *ConfigPersistence) SaveBalanceTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return cp.saveTaskConfig(BalanceTaskConfigFile, policy)
+}
+
+// LoadBalanceTaskConfig loads balance task configuration from protobuf file
+func (cp *ConfigPersistence) LoadBalanceTaskConfig() (*BalanceTaskConfig, error) {
+ // Load as TaskPolicy and extract balance config
+ if taskPolicy, err := cp.LoadBalanceTaskPolicy(); err == nil && taskPolicy != nil {
+ if balanceConfig := taskPolicy.GetBalanceConfig(); balanceConfig != nil {
+ return balanceConfig, nil
+ }
+ }
+
+ // Return default config if no valid config found
+ return &BalanceTaskConfig{
+ ImbalanceThreshold: 0.1,
+ MinServerCount: 2,
+ }, nil
+}
+
+// LoadBalanceTaskPolicy loads complete balance task policy from protobuf file
+func (cp *ConfigPersistence) LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) {
+ if cp.dataDir == "" {
+ // Return default policy if no data directory
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: 0.1,
+ MinServerCount: 2,
+ },
+ },
+ }, nil
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, BalanceTaskConfigFile)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ // Return default policy if file doesn't exist
+ return &worker_pb.TaskPolicy{
+ Enabled: true,
+ MaxConcurrent: 1,
+ RepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ CheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: 0.1,
+ MinServerCount: 2,
+ },
+ },
+ }, nil
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read balance task config file: %w", err)
+ }
+
+ // Try to unmarshal as TaskPolicy
+ var policy worker_pb.TaskPolicy
+ if err := proto.Unmarshal(configData, &policy); err == nil {
+ // Validate that it's actually a TaskPolicy with balance config
+ if policy.GetBalanceConfig() != nil {
+ glog.V(1).Infof("Loaded balance task policy from %s", configPath)
+ return &policy, nil
+ }
+ }
+
+ return nil, fmt.Errorf("failed to unmarshal balance task configuration")
+}
+
+// SaveReplicationTaskConfig saves replication task configuration to protobuf file
+func (cp *ConfigPersistence) SaveReplicationTaskConfig(config *ReplicationTaskConfig) error {
+ return cp.saveTaskConfig(ReplicationTaskConfigFile, config)
+}
+
+// LoadReplicationTaskConfig loads replication task configuration from protobuf file
+func (cp *ConfigPersistence) LoadReplicationTaskConfig() (*ReplicationTaskConfig, error) {
+ var config ReplicationTaskConfig
+ err := cp.loadTaskConfig(ReplicationTaskConfigFile, &config)
+ if err != nil {
+ // Return default config if file doesn't exist
+ if os.IsNotExist(err) {
+ return &ReplicationTaskConfig{
+ TargetReplicaCount: 1,
+ }, nil
+ }
+ return nil, err
+ }
+ return &config, nil
+}
+
+// saveTaskConfig is a generic helper for saving task configurations with both protobuf and JSON reference
+func (cp *ConfigPersistence) saveTaskConfig(filename string, config proto.Message) error {
+ if cp.dataDir == "" {
+ return fmt.Errorf("no data directory specified, cannot save task configuration")
+ }
+
+ // Create conf subdirectory path
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, filename)
+
+ // Generate JSON reference filename
+ jsonFilename := filename[:len(filename)-3] + ".json" // Replace .pb with .json
+ jsonPath := filepath.Join(confDir, jsonFilename)
+
+ // Create conf directory if it doesn't exist
+ if err := os.MkdirAll(confDir, ConfigDirPermissions); err != nil {
+ return fmt.Errorf("failed to create config directory: %w", err)
+ }
+
+ // Marshal configuration to protobuf binary format
+ configData, err := proto.Marshal(config)
+ if err != nil {
+ return fmt.Errorf("failed to marshal task config: %w", err)
+ }
+
+ // Write protobuf file
+ if err := os.WriteFile(configPath, configData, ConfigFilePermissions); err != nil {
+ return fmt.Errorf("failed to write task config file: %w", err)
+ }
+
+ // Marshal configuration to JSON for reference
+ marshaler := protojson.MarshalOptions{
+ Multiline: true,
+ Indent: " ",
+ EmitUnpopulated: true,
+ }
+ jsonData, err := marshaler.Marshal(config)
+ if err != nil {
+ glog.Warningf("Failed to marshal task config to JSON reference: %v", err)
+ } else {
+ // Write JSON reference file
+ if err := os.WriteFile(jsonPath, jsonData, ConfigFilePermissions); err != nil {
+ glog.Warningf("Failed to write task config JSON reference: %v", err)
+ }
+ }
+
+ glog.V(1).Infof("Saved task configuration to %s (with JSON reference)", configPath)
+ return nil
+}
+
+// loadTaskConfig is a generic helper for loading task configurations from conf subdirectory
+func (cp *ConfigPersistence) loadTaskConfig(filename string, config proto.Message) error {
+ if cp.dataDir == "" {
+ return os.ErrNotExist // Will trigger default config return
+ }
+
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ configPath := filepath.Join(confDir, filename)
+
+ // Check if file exists
+ if _, err := os.Stat(configPath); os.IsNotExist(err) {
+ return err // Will trigger default config return
+ }
+
+ // Read file
+ configData, err := os.ReadFile(configPath)
+ if err != nil {
+ return fmt.Errorf("failed to read task config file: %w", err)
+ }
+
+ // Unmarshal protobuf binary data
+ if err := proto.Unmarshal(configData, config); err != nil {
+ return fmt.Errorf("failed to unmarshal task config: %w", err)
+ }
+
+ glog.V(1).Infof("Loaded task configuration from %s", configPath)
+ return nil
+}
+
// GetDataDir returns the data directory path
func (cp *ConfigPersistence) GetDataDir() string {
return cp.dataDir
@@ -249,6 +596,7 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
info := map[string]interface{}{
"data_dir_configured": cp.IsConfigured(),
"data_dir": cp.dataDir,
+ "config_subdir": ConfigSubdir,
}
if cp.IsConfigured() {
@@ -256,10 +604,18 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
if _, err := os.Stat(cp.dataDir); err == nil {
info["data_dir_exists"] = true
- // List config files
- configFiles, err := cp.ListConfigFiles()
- if err == nil {
- info["config_files"] = configFiles
+ // Check if conf subdirectory exists
+ confDir := filepath.Join(cp.dataDir, ConfigSubdir)
+ if _, err := os.Stat(confDir); err == nil {
+ info["conf_dir_exists"] = true
+
+ // List config files
+ configFiles, err := cp.ListConfigFiles()
+ if err == nil {
+ info["config_files"] = configFiles
+ }
+ } else {
+ info["conf_dir_exists"] = false
}
} else {
info["data_dir_exists"] = false
@@ -268,3 +624,67 @@ func (cp *ConfigPersistence) GetConfigInfo() map[string]interface{} {
return info
}
+
+// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
+func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
+ policy := &worker_pb.MaintenancePolicy{
+ GlobalMaxConcurrent: 4,
+ DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
+ TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
+ }
+
+ // Load vacuum task configuration
+ if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
+ policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
+ Enabled: vacuumConfig.Enabled,
+ MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
+ MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
+ MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
+ },
+ },
+ }
+ }
+
+ // Load erasure coding task configuration
+ if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
+ policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
+ Enabled: ecConfig.Enabled,
+ MaxConcurrent: int32(ecConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: float64(ecConfig.FullnessRatio),
+ QuietForSeconds: int32(ecConfig.QuietForSeconds),
+ MinVolumeSizeMb: int32(ecConfig.MinSizeMB),
+ CollectionFilter: ecConfig.CollectionFilter,
+ },
+ },
+ }
+ }
+
+ // Load balance task configuration
+ if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
+ policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
+ Enabled: balanceConfig.Enabled,
+ MaxConcurrent: int32(balanceConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
+ MinServerCount: int32(balanceConfig.MinServerCount),
+ },
+ },
+ }
+ }
+
+ glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
+ return policy
+}
diff --git a/weed/admin/dash/ec_shard_management.go b/weed/admin/dash/ec_shard_management.go
new file mode 100644
index 000000000..272890cf0
--- /dev/null
+++ b/weed/admin/dash/ec_shard_management.go
@@ -0,0 +1,734 @@
+package dash
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+)
+
+// GetClusterEcShards retrieves cluster EC shards data with pagination, sorting, and filtering
+func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcShardsData, error) {
+ // Set defaults
+ if page < 1 {
+ page = 1
+ }
+ if pageSize < 1 || pageSize > 1000 {
+ pageSize = 100
+ }
+ if sortBy == "" {
+ sortBy = "volume_id"
+ }
+ if sortOrder == "" {
+ sortOrder = "asc"
+ }
+
+ var ecShards []EcShardWithInfo
+ volumeShardsMap := make(map[uint32]map[int]bool) // volumeId -> set of shards present
+ volumesWithAllShards := 0
+ volumesWithMissingShards := 0
+
+ // Get detailed EC shard information via gRPC
+ err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ if err != nil {
+ return err
+ }
+
+ if resp.TopologyInfo != nil {
+ for _, dc := range resp.TopologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, node := range rack.DataNodeInfos {
+ for _, diskInfo := range node.DiskInfos {
+ // Process EC shard information
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ volumeId := ecShardInfo.Id
+
+ // Initialize volume shards map if needed
+ if volumeShardsMap[volumeId] == nil {
+ volumeShardsMap[volumeId] = make(map[int]bool)
+ }
+
+ // Create individual shard entries for each shard this server has
+ shardBits := ecShardInfo.EcIndexBits
+ for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
+ if (shardBits & (1 << uint(shardId))) != 0 {
+ // Mark this shard as present for this volume
+ volumeShardsMap[volumeId][shardId] = true
+
+ ecShard := EcShardWithInfo{
+ VolumeID: volumeId,
+ ShardID: uint32(shardId),
+ Collection: ecShardInfo.Collection,
+ Size: 0, // EC shards don't have individual size in the API response
+ Server: node.Id,
+ DataCenter: dc.Id,
+ Rack: rack.Id,
+ DiskType: diskInfo.Type,
+ ModifiedTime: 0, // Not available in current API
+ EcIndexBits: ecShardInfo.EcIndexBits,
+ ShardCount: getShardCount(ecShardInfo.EcIndexBits),
+ }
+ ecShards = append(ecShards, ecShard)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ // Calculate volume-level completeness (across all servers)
+ volumeCompleteness := make(map[uint32]bool)
+ volumeMissingShards := make(map[uint32][]int)
+
+ for volumeId, shardsPresent := range volumeShardsMap {
+ var missingShards []int
+ shardCount := len(shardsPresent)
+
+ // Find which shards are missing for this volume across ALL servers
+ for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
+ if !shardsPresent[shardId] {
+ missingShards = append(missingShards, shardId)
+ }
+ }
+
+ isComplete := (shardCount == erasure_coding.TotalShardsCount)
+ volumeCompleteness[volumeId] = isComplete
+ volumeMissingShards[volumeId] = missingShards
+
+ if isComplete {
+ volumesWithAllShards++
+ } else {
+ volumesWithMissingShards++
+ }
+ }
+
+ // Update completeness info for each shard based on volume-level completeness
+ for i := range ecShards {
+ volumeId := ecShards[i].VolumeID
+ ecShards[i].IsComplete = volumeCompleteness[volumeId]
+ ecShards[i].MissingShards = volumeMissingShards[volumeId]
+ }
+
+ // Filter by collection if specified
+ if collection != "" {
+ var filteredShards []EcShardWithInfo
+ for _, shard := range ecShards {
+ if shard.Collection == collection {
+ filteredShards = append(filteredShards, shard)
+ }
+ }
+ ecShards = filteredShards
+ }
+
+ // Sort the results
+ sortEcShards(ecShards, sortBy, sortOrder)
+
+ // Calculate statistics for conditional display
+ dataCenters := make(map[string]bool)
+ racks := make(map[string]bool)
+ collections := make(map[string]bool)
+
+ for _, shard := range ecShards {
+ dataCenters[shard.DataCenter] = true
+ racks[shard.Rack] = true
+ if shard.Collection != "" {
+ collections[shard.Collection] = true
+ }
+ }
+
+ // Pagination
+ totalShards := len(ecShards)
+ totalPages := (totalShards + pageSize - 1) / pageSize
+ startIndex := (page - 1) * pageSize
+ endIndex := startIndex + pageSize
+ if endIndex > totalShards {
+ endIndex = totalShards
+ }
+
+ if startIndex >= totalShards {
+ startIndex = 0
+ endIndex = 0
+ }
+
+ paginatedShards := ecShards[startIndex:endIndex]
+
+ // Build response
+ data := &ClusterEcShardsData{
+ EcShards: paginatedShards,
+ TotalShards: totalShards,
+ TotalVolumes: len(volumeShardsMap),
+ LastUpdated: time.Now(),
+
+ // Pagination
+ CurrentPage: page,
+ TotalPages: totalPages,
+ PageSize: pageSize,
+
+ // Sorting
+ SortBy: sortBy,
+ SortOrder: sortOrder,
+
+ // Statistics
+ DataCenterCount: len(dataCenters),
+ RackCount: len(racks),
+ CollectionCount: len(collections),
+
+ // Conditional display flags
+ ShowDataCenterColumn: len(dataCenters) > 1,
+ ShowRackColumn: len(racks) > 1,
+ ShowCollectionColumn: len(collections) > 1 || collection != "",
+
+ // Filtering
+ FilterCollection: collection,
+
+ // EC specific statistics
+ ShardsPerVolume: make(map[uint32]int), // This will be recalculated below
+ VolumesWithAllShards: volumesWithAllShards,
+ VolumesWithMissingShards: volumesWithMissingShards,
+ }
+
+ // Recalculate ShardsPerVolume for the response
+ for volumeId, shardsPresent := range volumeShardsMap {
+ data.ShardsPerVolume[volumeId] = len(shardsPresent)
+ }
+
+ // Set single values when only one exists
+ if len(dataCenters) == 1 {
+ for dc := range dataCenters {
+ data.SingleDataCenter = dc
+ break
+ }
+ }
+ if len(racks) == 1 {
+ for rack := range racks {
+ data.SingleRack = rack
+ break
+ }
+ }
+ if len(collections) == 1 {
+ for col := range collections {
+ data.SingleCollection = col
+ break
+ }
+ }
+
+ return data, nil
+}
+
+// GetClusterEcVolumes retrieves cluster EC volumes data grouped by volume ID with shard locations
+func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcVolumesData, error) {
+ // Set defaults
+ if page < 1 {
+ page = 1
+ }
+ if pageSize < 1 || pageSize > 1000 {
+ pageSize = 100
+ }
+ if sortBy == "" {
+ sortBy = "volume_id"
+ }
+ if sortOrder == "" {
+ sortOrder = "asc"
+ }
+
+ volumeData := make(map[uint32]*EcVolumeWithShards)
+ totalShards := 0
+
+ // Get detailed EC shard information via gRPC
+ err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ if err != nil {
+ return err
+ }
+
+ if resp.TopologyInfo != nil {
+ for _, dc := range resp.TopologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, node := range rack.DataNodeInfos {
+ for _, diskInfo := range node.DiskInfos {
+ // Process EC shard information
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ volumeId := ecShardInfo.Id
+
+ // Initialize volume data if needed
+ if volumeData[volumeId] == nil {
+ volumeData[volumeId] = &EcVolumeWithShards{
+ VolumeID: volumeId,
+ Collection: ecShardInfo.Collection,
+ TotalShards: 0,
+ IsComplete: false,
+ MissingShards: []int{},
+ ShardLocations: make(map[int]string),
+ ShardSizes: make(map[int]int64),
+ DataCenters: []string{},
+ Servers: []string{},
+ Racks: []string{},
+ }
+ }
+
+ volume := volumeData[volumeId]
+
+ // Track data centers and servers
+ dcExists := false
+ for _, existingDc := range volume.DataCenters {
+ if existingDc == dc.Id {
+ dcExists = true
+ break
+ }
+ }
+ if !dcExists {
+ volume.DataCenters = append(volume.DataCenters, dc.Id)
+ }
+
+ serverExists := false
+ for _, existingServer := range volume.Servers {
+ if existingServer == node.Id {
+ serverExists = true
+ break
+ }
+ }
+ if !serverExists {
+ volume.Servers = append(volume.Servers, node.Id)
+ }
+
+ // Track racks
+ rackExists := false
+ for _, existingRack := range volume.Racks {
+ if existingRack == rack.Id {
+ rackExists = true
+ break
+ }
+ }
+ if !rackExists {
+ volume.Racks = append(volume.Racks, rack.Id)
+ }
+
+ // Process each shard this server has for this volume
+ shardBits := ecShardInfo.EcIndexBits
+ for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
+ if (shardBits & (1 << uint(shardId))) != 0 {
+ // Record shard location
+ volume.ShardLocations[shardId] = node.Id
+ totalShards++
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ // Collect shard size information from volume servers
+ for volumeId, volume := range volumeData {
+ // Group servers by volume to minimize gRPC calls
+ serverHasVolume := make(map[string]bool)
+ for _, server := range volume.Servers {
+ serverHasVolume[server] = true
+ }
+
+ // Query each server for shard sizes
+ for server := range serverHasVolume {
+ err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
+ resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
+ VolumeId: volumeId,
+ })
+ if err != nil {
+ glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeId, err)
+ return nil // Continue with other servers, don't fail the entire request
+ }
+
+ // Update shard sizes
+ for _, shardInfo := range resp.EcShardInfos {
+ volume.ShardSizes[int(shardInfo.ShardId)] = shardInfo.Size
+ }
+
+ return nil
+ })
+ if err != nil {
+ glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
+ }
+ }
+ }
+
+ // Calculate completeness for each volume
+ completeVolumes := 0
+ incompleteVolumes := 0
+
+ for _, volume := range volumeData {
+ volume.TotalShards = len(volume.ShardLocations)
+
+ // Find missing shards
+ var missingShards []int
+ for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
+ if _, exists := volume.ShardLocations[shardId]; !exists {
+ missingShards = append(missingShards, shardId)
+ }
+ }
+
+ volume.MissingShards = missingShards
+ volume.IsComplete = (len(missingShards) == 0)
+
+ if volume.IsComplete {
+ completeVolumes++
+ } else {
+ incompleteVolumes++
+ }
+ }
+
+ // Convert map to slice
+ var ecVolumes []EcVolumeWithShards
+ for _, volume := range volumeData {
+ // Filter by collection if specified
+ if collection == "" || volume.Collection == collection {
+ ecVolumes = append(ecVolumes, *volume)
+ }
+ }
+
+ // Sort the results
+ sortEcVolumes(ecVolumes, sortBy, sortOrder)
+
+ // Calculate statistics for conditional display
+ dataCenters := make(map[string]bool)
+ collections := make(map[string]bool)
+
+ for _, volume := range ecVolumes {
+ for _, dc := range volume.DataCenters {
+ dataCenters[dc] = true
+ }
+ if volume.Collection != "" {
+ collections[volume.Collection] = true
+ }
+ }
+
+ // Pagination
+ totalVolumes := len(ecVolumes)
+ totalPages := (totalVolumes + pageSize - 1) / pageSize
+ startIndex := (page - 1) * pageSize
+ endIndex := startIndex + pageSize
+ if endIndex > totalVolumes {
+ endIndex = totalVolumes
+ }
+
+ if startIndex >= totalVolumes {
+ startIndex = 0
+ endIndex = 0
+ }
+
+ paginatedVolumes := ecVolumes[startIndex:endIndex]
+
+ // Build response
+ data := &ClusterEcVolumesData{
+ EcVolumes: paginatedVolumes,
+ TotalVolumes: totalVolumes,
+ LastUpdated: time.Now(),
+
+ // Pagination
+ Page: page,
+ PageSize: pageSize,
+ TotalPages: totalPages,
+
+ // Sorting
+ SortBy: sortBy,
+ SortOrder: sortOrder,
+
+ // Filtering
+ Collection: collection,
+
+ // Conditional display flags
+ ShowDataCenterColumn: len(dataCenters) > 1,
+ ShowRackColumn: false, // We don't track racks in this view for simplicity
+ ShowCollectionColumn: len(collections) > 1 || collection != "",
+
+ // Statistics
+ CompleteVolumes: completeVolumes,
+ IncompleteVolumes: incompleteVolumes,
+ TotalShards: totalShards,
+ }
+
+ return data, nil
+}
+
+// sortEcVolumes sorts EC volumes based on the specified field and order
+func sortEcVolumes(volumes []EcVolumeWithShards, sortBy string, sortOrder string) {
+ sort.Slice(volumes, func(i, j int) bool {
+ var less bool
+ switch sortBy {
+ case "volume_id":
+ less = volumes[i].VolumeID < volumes[j].VolumeID
+ case "collection":
+ if volumes[i].Collection == volumes[j].Collection {
+ less = volumes[i].VolumeID < volumes[j].VolumeID
+ } else {
+ less = volumes[i].Collection < volumes[j].Collection
+ }
+ case "total_shards":
+ if volumes[i].TotalShards == volumes[j].TotalShards {
+ less = volumes[i].VolumeID < volumes[j].VolumeID
+ } else {
+ less = volumes[i].TotalShards < volumes[j].TotalShards
+ }
+ case "completeness":
+ // Complete volumes first, then by volume ID
+ if volumes[i].IsComplete == volumes[j].IsComplete {
+ less = volumes[i].VolumeID < volumes[j].VolumeID
+ } else {
+ less = volumes[i].IsComplete && !volumes[j].IsComplete
+ }
+ default:
+ less = volumes[i].VolumeID < volumes[j].VolumeID
+ }
+
+ if sortOrder == "desc" {
+ return !less
+ }
+ return less
+ })
+}
+
+// getShardCount returns the number of shards represented by the bitmap
+func getShardCount(ecIndexBits uint32) int {
+ count := 0
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ if (ecIndexBits & (1 << uint(i))) != 0 {
+ count++
+ }
+ }
+ return count
+}
+
+// getMissingShards returns a slice of missing shard IDs for a volume
+func getMissingShards(ecIndexBits uint32) []int {
+ var missing []int
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ if (ecIndexBits & (1 << uint(i))) == 0 {
+ missing = append(missing, i)
+ }
+ }
+ return missing
+}
+
+// sortEcShards sorts EC shards based on the specified field and order
+func sortEcShards(shards []EcShardWithInfo, sortBy string, sortOrder string) {
+ sort.Slice(shards, func(i, j int) bool {
+ var less bool
+ switch sortBy {
+ case "shard_id":
+ less = shards[i].ShardID < shards[j].ShardID
+ case "server":
+ if shards[i].Server == shards[j].Server {
+ less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
+ } else {
+ less = shards[i].Server < shards[j].Server
+ }
+ case "data_center":
+ if shards[i].DataCenter == shards[j].DataCenter {
+ less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
+ } else {
+ less = shards[i].DataCenter < shards[j].DataCenter
+ }
+ case "rack":
+ if shards[i].Rack == shards[j].Rack {
+ less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
+ } else {
+ less = shards[i].Rack < shards[j].Rack
+ }
+ default:
+ less = shards[i].ShardID < shards[j].ShardID
+ }
+
+ if sortOrder == "desc" {
+ return !less
+ }
+ return less
+ })
+}
+
+// GetEcVolumeDetails retrieves detailed information about a specific EC volume
+func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrder string) (*EcVolumeDetailsData, error) {
+ // Set defaults
+ if sortBy == "" {
+ sortBy = "shard_id"
+ }
+ if sortOrder == "" {
+ sortOrder = "asc"
+ }
+
+ var shards []EcShardWithInfo
+ var collection string
+ dataCenters := make(map[string]bool)
+ servers := make(map[string]bool)
+
+ // Get detailed EC shard information for the specific volume via gRPC
+ err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ if err != nil {
+ return err
+ }
+
+ if resp.TopologyInfo != nil {
+ for _, dc := range resp.TopologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, node := range rack.DataNodeInfos {
+ for _, diskInfo := range node.DiskInfos {
+ // Process EC shard information for this specific volume
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ if ecShardInfo.Id == volumeID {
+ collection = ecShardInfo.Collection
+ dataCenters[dc.Id] = true
+ servers[node.Id] = true
+
+ // Create individual shard entries for each shard this server has
+ shardBits := ecShardInfo.EcIndexBits
+ for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
+ if (shardBits & (1 << uint(shardId))) != 0 {
+ ecShard := EcShardWithInfo{
+ VolumeID: ecShardInfo.Id,
+ ShardID: uint32(shardId),
+ Collection: ecShardInfo.Collection,
+ Size: 0, // EC shards don't have individual size in the API response
+ Server: node.Id,
+ DataCenter: dc.Id,
+ Rack: rack.Id,
+ DiskType: diskInfo.Type,
+ ModifiedTime: 0, // Not available in current API
+ EcIndexBits: ecShardInfo.EcIndexBits,
+ ShardCount: getShardCount(ecShardInfo.EcIndexBits),
+ }
+ shards = append(shards, ecShard)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ if len(shards) == 0 {
+ return nil, fmt.Errorf("EC volume %d not found", volumeID)
+ }
+
+ // Collect shard size information from volume servers
+ shardSizeMap := make(map[string]map[uint32]uint64) // server -> shardId -> size
+ for _, shard := range shards {
+ server := shard.Server
+ if _, exists := shardSizeMap[server]; !exists {
+ // Query this server for shard sizes
+ err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
+ resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
+ VolumeId: volumeID,
+ })
+ if err != nil {
+ glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeID, err)
+ return nil // Continue with other servers, don't fail the entire request
+ }
+
+ // Store shard sizes for this server
+ shardSizeMap[server] = make(map[uint32]uint64)
+ for _, shardInfo := range resp.EcShardInfos {
+ shardSizeMap[server][shardInfo.ShardId] = uint64(shardInfo.Size)
+ }
+
+ return nil
+ })
+ if err != nil {
+ glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
+ }
+ }
+ }
+
+ // Update shard sizes in the shards array
+ for i := range shards {
+ server := shards[i].Server
+ shardId := shards[i].ShardID
+ if serverSizes, exists := shardSizeMap[server]; exists {
+ if size, exists := serverSizes[shardId]; exists {
+ shards[i].Size = size
+ }
+ }
+ }
+
+ // Calculate completeness based on unique shard IDs
+ foundShards := make(map[int]bool)
+ for _, shard := range shards {
+ foundShards[int(shard.ShardID)] = true
+ }
+
+ totalUniqueShards := len(foundShards)
+ isComplete := (totalUniqueShards == erasure_coding.TotalShardsCount)
+
+ // Calculate missing shards
+ var missingShards []int
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ if !foundShards[i] {
+ missingShards = append(missingShards, i)
+ }
+ }
+
+ // Update completeness info for each shard
+ for i := range shards {
+ shards[i].IsComplete = isComplete
+ shards[i].MissingShards = missingShards
+ }
+
+ // Sort shards based on parameters
+ sortEcShards(shards, sortBy, sortOrder)
+
+ // Convert maps to slices
+ var dcList []string
+ for dc := range dataCenters {
+ dcList = append(dcList, dc)
+ }
+ var serverList []string
+ for server := range servers {
+ serverList = append(serverList, server)
+ }
+
+ data := &EcVolumeDetailsData{
+ VolumeID: volumeID,
+ Collection: collection,
+ Shards: shards,
+ TotalShards: totalUniqueShards,
+ IsComplete: isComplete,
+ MissingShards: missingShards,
+ DataCenters: dcList,
+ Servers: serverList,
+ LastUpdated: time.Now(),
+ SortBy: sortBy,
+ SortOrder: sortOrder,
+ }
+
+ return data, nil
+}
diff --git a/weed/admin/dash/middleware.go b/weed/admin/dash/middleware.go
index ce538d7ca..a4cfedfd0 100644
--- a/weed/admin/dash/middleware.go
+++ b/weed/admin/dash/middleware.go
@@ -25,3 +25,26 @@ func RequireAuth() gin.HandlerFunc {
c.Next()
}
}
+
+// RequireAuthAPI checks if user is authenticated for API endpoints
+// Returns JSON error instead of redirecting to login page
+func RequireAuthAPI() gin.HandlerFunc {
+ return func(c *gin.Context) {
+ session := sessions.Default(c)
+ authenticated := session.Get("authenticated")
+ username := session.Get("username")
+
+ if authenticated != true || username == nil {
+ c.JSON(http.StatusUnauthorized, gin.H{
+ "error": "Authentication required",
+ "message": "Please log in to access this endpoint",
+ })
+ c.Abort()
+ return
+ }
+
+ // Set username in context for use in handlers
+ c.Set("username", username)
+ c.Next()
+ }
+}
diff --git a/weed/admin/dash/types.go b/weed/admin/dash/types.go
index 60f499229..f098fad8c 100644
--- a/weed/admin/dash/types.go
+++ b/weed/admin/dash/types.go
@@ -135,6 +135,84 @@ type ClusterVolumesData struct {
FilterCollection string `json:"filter_collection"`
}
+// ClusterEcShardsData represents the data for the cluster EC shards page
+type ClusterEcShardsData struct {
+ Username string `json:"username"`
+ EcShards []EcShardWithInfo `json:"ec_shards"`
+ TotalShards int `json:"total_shards"`
+ TotalVolumes int `json:"total_volumes"`
+ LastUpdated time.Time `json:"last_updated"`
+
+ // Pagination
+ CurrentPage int `json:"current_page"`
+ TotalPages int `json:"total_pages"`
+ PageSize int `json:"page_size"`
+
+ // Sorting
+ SortBy string `json:"sort_by"`
+ SortOrder string `json:"sort_order"`
+
+ // Statistics
+ DataCenterCount int `json:"datacenter_count"`
+ RackCount int `json:"rack_count"`
+ CollectionCount int `json:"collection_count"`
+
+ // Conditional display flags
+ ShowDataCenterColumn bool `json:"show_datacenter_column"`
+ ShowRackColumn bool `json:"show_rack_column"`
+ ShowCollectionColumn bool `json:"show_collection_column"`
+
+ // Single values when only one exists
+ SingleDataCenter string `json:"single_datacenter"`
+ SingleRack string `json:"single_rack"`
+ SingleCollection string `json:"single_collection"`
+
+ // Filtering
+ FilterCollection string `json:"filter_collection"`
+
+ // EC specific statistics
+ ShardsPerVolume map[uint32]int `json:"shards_per_volume"` // VolumeID -> shard count
+ VolumesWithAllShards int `json:"volumes_with_all_shards"` // Volumes with all 14 shards
+ VolumesWithMissingShards int `json:"volumes_with_missing_shards"` // Volumes missing shards
+}
+
+// EcShardWithInfo represents an EC shard with its topology information
+type EcShardWithInfo struct {
+ VolumeID uint32 `json:"volume_id"`
+ ShardID uint32 `json:"shard_id"`
+ Collection string `json:"collection"`
+ Size uint64 `json:"size"`
+ Server string `json:"server"`
+ DataCenter string `json:"datacenter"`
+ Rack string `json:"rack"`
+ DiskType string `json:"disk_type"`
+ ModifiedTime int64 `json:"modified_time"`
+
+ // EC specific fields
+ EcIndexBits uint32 `json:"ec_index_bits"` // Bitmap of which shards this server has
+ ShardCount int `json:"shard_count"` // Number of shards this server has for this volume
+ IsComplete bool `json:"is_complete"` // True if this volume has all 14 shards
+ MissingShards []int `json:"missing_shards"` // List of missing shard IDs
+}
+
+// EcVolumeDetailsData represents the data for the EC volume details page
+type EcVolumeDetailsData struct {
+ Username string `json:"username"`
+ VolumeID uint32 `json:"volume_id"`
+ Collection string `json:"collection"`
+ Shards []EcShardWithInfo `json:"shards"`
+ TotalShards int `json:"total_shards"`
+ IsComplete bool `json:"is_complete"`
+ MissingShards []int `json:"missing_shards"`
+ DataCenters []string `json:"datacenters"`
+ Servers []string `json:"servers"`
+ LastUpdated time.Time `json:"last_updated"`
+
+ // Sorting
+ SortBy string `json:"sort_by"`
+ SortOrder string `json:"sort_order"`
+}
+
type VolumeDetailsData struct {
Volume VolumeWithTopology `json:"volume"`
Replicas []VolumeWithTopology `json:"replicas"`
@@ -145,12 +223,13 @@ type VolumeDetailsData struct {
// Collection management structures
type CollectionInfo struct {
- Name string `json:"name"`
- DataCenter string `json:"datacenter"`
- VolumeCount int `json:"volume_count"`
- FileCount int64 `json:"file_count"`
- TotalSize int64 `json:"total_size"`
- DiskTypes []string `json:"disk_types"`
+ Name string `json:"name"`
+ DataCenter string `json:"datacenter"`
+ VolumeCount int `json:"volume_count"`
+ EcVolumeCount int `json:"ec_volume_count"`
+ FileCount int64 `json:"file_count"`
+ TotalSize int64 `json:"total_size"`
+ DiskTypes []string `json:"disk_types"`
}
type ClusterCollectionsData struct {
@@ -158,6 +237,7 @@ type ClusterCollectionsData struct {
Collections []CollectionInfo `json:"collections"`
TotalCollections int `json:"total_collections"`
TotalVolumes int `json:"total_volumes"`
+ TotalEcVolumes int `json:"total_ec_volumes"`
TotalFiles int64 `json:"total_files"`
TotalSize int64 `json:"total_size"`
LastUpdated time.Time `json:"last_updated"`
@@ -376,3 +456,74 @@ type MaintenanceWorkersData struct {
}
// Maintenance system types are now in weed/admin/maintenance package
+
+// EcVolumeWithShards represents an EC volume with its shard distribution
+type EcVolumeWithShards struct {
+ VolumeID uint32 `json:"volume_id"`
+ Collection string `json:"collection"`
+ TotalShards int `json:"total_shards"`
+ IsComplete bool `json:"is_complete"`
+ MissingShards []int `json:"missing_shards"`
+ ShardLocations map[int]string `json:"shard_locations"` // shardId -> server
+ ShardSizes map[int]int64 `json:"shard_sizes"` // shardId -> size in bytes
+ DataCenters []string `json:"data_centers"`
+ Servers []string `json:"servers"`
+ Racks []string `json:"racks"`
+ ModifiedTime int64 `json:"modified_time"`
+}
+
+// ClusterEcVolumesData represents the response for clustered EC volumes view
+type ClusterEcVolumesData struct {
+ EcVolumes []EcVolumeWithShards `json:"ec_volumes"`
+ TotalVolumes int `json:"total_volumes"`
+ LastUpdated time.Time `json:"last_updated"`
+
+ // Pagination
+ Page int `json:"page"`
+ PageSize int `json:"page_size"`
+ TotalPages int `json:"total_pages"`
+
+ // Sorting
+ SortBy string `json:"sort_by"`
+ SortOrder string `json:"sort_order"`
+
+ // Filtering
+ Collection string `json:"collection"`
+
+ // Conditional display flags
+ ShowDataCenterColumn bool `json:"show_datacenter_column"`
+ ShowRackColumn bool `json:"show_rack_column"`
+ ShowCollectionColumn bool `json:"show_collection_column"`
+
+ // Statistics
+ CompleteVolumes int `json:"complete_volumes"`
+ IncompleteVolumes int `json:"incomplete_volumes"`
+ TotalShards int `json:"total_shards"`
+
+ // User context
+ Username string `json:"username"`
+}
+
+// Collection detail page structures
+type CollectionDetailsData struct {
+ Username string `json:"username"`
+ CollectionName string `json:"collection_name"`
+ RegularVolumes []VolumeWithTopology `json:"regular_volumes"`
+ EcVolumes []EcVolumeWithShards `json:"ec_volumes"`
+ TotalVolumes int `json:"total_volumes"`
+ TotalEcVolumes int `json:"total_ec_volumes"`
+ TotalFiles int64 `json:"total_files"`
+ TotalSize int64 `json:"total_size"`
+ DataCenters []string `json:"data_centers"`
+ DiskTypes []string `json:"disk_types"`
+ LastUpdated time.Time `json:"last_updated"`
+
+ // Pagination
+ Page int `json:"page"`
+ PageSize int `json:"page_size"`
+ TotalPages int `json:"total_pages"`
+
+ // Sorting
+ SortBy string `json:"sort_by"`
+ SortOrder string `json:"sort_order"`
+}
diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go
index 36f97261a..3b4312235 100644
--- a/weed/admin/dash/worker_grpc_server.go
+++ b/weed/admin/dash/worker_grpc_server.go
@@ -319,27 +319,41 @@ func (s *WorkerGrpcServer) handleHeartbeat(conn *WorkerConnection, heartbeat *wo
// handleTaskRequest processes task requests from workers
func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *worker_pb.TaskRequest) {
+ // glog.Infof("DEBUG handleTaskRequest: Worker %s requesting tasks with capabilities %v", conn.workerID, conn.capabilities)
+
if s.adminServer.maintenanceManager == nil {
+ glog.Infof("DEBUG handleTaskRequest: maintenance manager is nil")
return
}
// Get next task from maintenance manager
task := s.adminServer.maintenanceManager.GetNextTask(conn.workerID, conn.capabilities)
+ // glog.Infof("DEBUG handleTaskRequest: GetNextTask returned task: %v", task != nil)
if task != nil {
+ glog.Infof("DEBUG handleTaskRequest: Assigning task %s (type: %s) to worker %s", task.ID, task.Type, conn.workerID)
+
+ // Use typed params directly - master client should already be configured in the params
+ var taskParams *worker_pb.TaskParams
+ if task.TypedParams != nil {
+ taskParams = task.TypedParams
+ } else {
+ // Create basic params if none exist
+ taskParams = &worker_pb.TaskParams{
+ VolumeId: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ }
+ }
+
// Send task assignment
assignment := &worker_pb.AdminMessage{
Timestamp: time.Now().Unix(),
Message: &worker_pb.AdminMessage_TaskAssignment{
TaskAssignment: &worker_pb.TaskAssignment{
- TaskId: task.ID,
- TaskType: string(task.Type),
- Params: &worker_pb.TaskParams{
- VolumeId: task.VolumeID,
- Server: task.Server,
- Collection: task.Collection,
- Parameters: convertTaskParameters(task.Parameters),
- },
+ TaskId: task.ID,
+ TaskType: string(task.Type),
+ Params: taskParams,
Priority: int32(task.Priority),
CreatedTime: time.Now().Unix(),
},
@@ -348,10 +362,12 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo
select {
case conn.outgoing <- assignment:
- glog.V(2).Infof("Assigned task %s to worker %s", task.ID, conn.workerID)
+ glog.Infof("DEBUG handleTaskRequest: Successfully assigned task %s to worker %s", task.ID, conn.workerID)
case <-time.After(time.Second):
glog.Warningf("Failed to send task assignment to worker %s", conn.workerID)
}
+ } else {
+ // glog.Infof("DEBUG handleTaskRequest: No tasks available for worker %s", conn.workerID)
}
}