diff options
Diffstat (limited to 'weed/admin/maintenance/maintenance_scanner.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_scanner.go | 98 |
1 files changed, 89 insertions, 9 deletions
diff --git a/weed/admin/maintenance/maintenance_scanner.go b/weed/admin/maintenance/maintenance_scanner.go index 271765ef8..ef41b78ed 100644 --- a/weed/admin/maintenance/maintenance_scanner.go +++ b/weed/admin/maintenance/maintenance_scanner.go @@ -43,7 +43,18 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, // Convert metrics to task system format taskMetrics := ms.convertToTaskMetrics(volumeMetrics) - // Use task detection system + // Update topology information for complete cluster view (including empty servers) + // This must happen before task detection to ensure EC placement can consider all servers + if ms.lastTopologyInfo != nil { + if err := ms.integration.UpdateTopologyInfo(ms.lastTopologyInfo); err != nil { + glog.Errorf("Failed to update topology info for empty servers: %v", err) + // Don't fail the scan - continue with just volume-bearing servers + } else { + glog.V(1).Infof("Updated topology info for complete cluster view including empty servers") + } + } + + // Use task detection system with complete cluster information results, err := ms.integration.ScanWithTaskDetectors(taskMetrics) if err != nil { glog.Errorf("Task scanning failed: %v", err) @@ -62,25 +73,60 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, // getVolumeHealthMetrics collects health information for all volumes func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) { var metrics []*VolumeHealthMetrics + var volumeSizeLimitMB uint64 + glog.V(1).Infof("Collecting volume health metrics from master") err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error { + // First, get volume size limit from master configuration + configResp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + glog.Warningf("Failed to get volume size limit from master: %v", err) + volumeSizeLimitMB = 30000 // Default to 30GB if we can't get from master + } else { + volumeSizeLimitMB = uint64(configResp.VolumeSizeLimitMB) + } + + // Now get volume list resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) if err != nil { return err } if resp.TopologyInfo == nil { + glog.Warningf("No topology info received from master") return nil } + volumeSizeLimitBytes := volumeSizeLimitMB * 1024 * 1024 // Convert MB to bytes + + // Track all nodes discovered in topology + var allNodesInTopology []string + var nodesWithVolumes []string + var nodesWithoutVolumes []string + for _, dc := range resp.TopologyInfo.DataCenterInfos { + glog.V(2).Infof("Processing datacenter: %s", dc.Id) for _, rack := range dc.RackInfos { + glog.V(2).Infof("Processing rack: %s in datacenter: %s", rack.Id, dc.Id) for _, node := range rack.DataNodeInfos { - for _, diskInfo := range node.DiskInfos { + allNodesInTopology = append(allNodesInTopology, node.Id) + glog.V(2).Infof("Found volume server in topology: %s (disks: %d)", node.Id, len(node.DiskInfos)) + + hasVolumes := false + // Process each disk on this node + for diskType, diskInfo := range node.DiskInfos { + if len(diskInfo.VolumeInfos) > 0 { + hasVolumes = true + glog.V(2).Infof("Volume server %s disk %s has %d volumes", node.Id, diskType, len(diskInfo.VolumeInfos)) + } + + // Process volumes on this specific disk for _, volInfo := range diskInfo.VolumeInfos { metric := &VolumeHealthMetrics{ VolumeID: volInfo.Id, Server: node.Id, + DiskType: diskType, // Track which disk this volume is on + DiskId: volInfo.DiskId, // Use disk ID from volume info Collection: volInfo.Collection, Size: volInfo.Size, DeletedBytes: volInfo.DeletedByteCount, @@ -94,31 +140,58 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, // Calculate derived metrics if metric.Size > 0 { metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size) - // Calculate fullness ratio (would need volume size limit) - // metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit) + // Calculate fullness ratio using actual volume size limit from master + metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes) } metric.Age = time.Since(metric.LastModified) + glog.V(3).Infof("Volume %d on %s:%s (ID %d): size=%d, limit=%d, fullness=%.2f", + metric.VolumeID, metric.Server, metric.DiskType, metric.DiskId, metric.Size, volumeSizeLimitBytes, metric.FullnessRatio) + metrics = append(metrics, metric) } } + + if hasVolumes { + nodesWithVolumes = append(nodesWithVolumes, node.Id) + } else { + nodesWithoutVolumes = append(nodesWithoutVolumes, node.Id) + glog.V(1).Infof("Volume server %s found in topology but has no volumes", node.Id) + } } } } + glog.Infof("Topology discovery complete:") + glog.Infof(" - Total volume servers in topology: %d (%v)", len(allNodesInTopology), allNodesInTopology) + glog.Infof(" - Volume servers with volumes: %d (%v)", len(nodesWithVolumes), nodesWithVolumes) + glog.Infof(" - Volume servers without volumes: %d (%v)", len(nodesWithoutVolumes), nodesWithoutVolumes) + glog.Infof("Note: Maintenance system will track empty servers separately from volume metrics.") + + // Store topology info for volume shard tracker + ms.lastTopologyInfo = resp.TopologyInfo + return nil }) if err != nil { + glog.Errorf("Failed to get volume health metrics: %v", err) return nil, err } + glog.V(1).Infof("Successfully collected metrics for %d actual volumes with disk ID information", len(metrics)) + // Count actual replicas and identify EC volumes ms.enrichVolumeMetrics(metrics) return metrics, nil } +// getTopologyInfo returns the last collected topology information +func (ms *MaintenanceScanner) getTopologyInfo() *master_pb.TopologyInfo { + return ms.lastTopologyInfo +} + // enrichVolumeMetrics adds additional information like replica counts func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) { // Group volumes by ID to count replicas @@ -127,13 +200,17 @@ func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric) } - // Update replica counts - for _, group := range volumeGroups { - actualReplicas := len(group) - for _, metric := range group { - metric.ReplicaCount = actualReplicas + // Update replica counts for actual volumes + for volumeID, replicas := range volumeGroups { + replicaCount := len(replicas) + for _, replica := range replicas { + replica.ReplicaCount = replicaCount } + glog.V(3).Infof("Volume %d has %d replicas", volumeID, replicaCount) } + + // TODO: Identify EC volumes by checking volume structure + // This would require querying volume servers for EC shard information } // convertToTaskMetrics converts existing volume metrics to task system format @@ -144,6 +221,8 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric simplified = append(simplified, &types.VolumeHealthMetrics{ VolumeID: metric.VolumeID, Server: metric.Server, + DiskType: metric.DiskType, + DiskId: metric.DiskId, Collection: metric.Collection, Size: metric.Size, DeletedBytes: metric.DeletedBytes, @@ -159,5 +238,6 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric }) } + glog.V(2).Infof("Converted %d volume metrics with disk ID information for task detection", len(simplified)) return simplified } |
