aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_scanner.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/maintenance/maintenance_scanner.go')
-rw-r--r--weed/admin/maintenance/maintenance_scanner.go98
1 files changed, 89 insertions, 9 deletions
diff --git a/weed/admin/maintenance/maintenance_scanner.go b/weed/admin/maintenance/maintenance_scanner.go
index 271765ef8..ef41b78ed 100644
--- a/weed/admin/maintenance/maintenance_scanner.go
+++ b/weed/admin/maintenance/maintenance_scanner.go
@@ -43,7 +43,18 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult,
// Convert metrics to task system format
taskMetrics := ms.convertToTaskMetrics(volumeMetrics)
- // Use task detection system
+ // Update topology information for complete cluster view (including empty servers)
+ // This must happen before task detection to ensure EC placement can consider all servers
+ if ms.lastTopologyInfo != nil {
+ if err := ms.integration.UpdateTopologyInfo(ms.lastTopologyInfo); err != nil {
+ glog.Errorf("Failed to update topology info for empty servers: %v", err)
+ // Don't fail the scan - continue with just volume-bearing servers
+ } else {
+ glog.V(1).Infof("Updated topology info for complete cluster view including empty servers")
+ }
+ }
+
+ // Use task detection system with complete cluster information
results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
if err != nil {
glog.Errorf("Task scanning failed: %v", err)
@@ -62,25 +73,60 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult,
// getVolumeHealthMetrics collects health information for all volumes
func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
var metrics []*VolumeHealthMetrics
+ var volumeSizeLimitMB uint64
+ glog.V(1).Infof("Collecting volume health metrics from master")
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
+ // First, get volume size limit from master configuration
+ configResp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ glog.Warningf("Failed to get volume size limit from master: %v", err)
+ volumeSizeLimitMB = 30000 // Default to 30GB if we can't get from master
+ } else {
+ volumeSizeLimitMB = uint64(configResp.VolumeSizeLimitMB)
+ }
+
+ // Now get volume list
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo == nil {
+ glog.Warningf("No topology info received from master")
return nil
}
+ volumeSizeLimitBytes := volumeSizeLimitMB * 1024 * 1024 // Convert MB to bytes
+
+ // Track all nodes discovered in topology
+ var allNodesInTopology []string
+ var nodesWithVolumes []string
+ var nodesWithoutVolumes []string
+
for _, dc := range resp.TopologyInfo.DataCenterInfos {
+ glog.V(2).Infof("Processing datacenter: %s", dc.Id)
for _, rack := range dc.RackInfos {
+ glog.V(2).Infof("Processing rack: %s in datacenter: %s", rack.Id, dc.Id)
for _, node := range rack.DataNodeInfos {
- for _, diskInfo := range node.DiskInfos {
+ allNodesInTopology = append(allNodesInTopology, node.Id)
+ glog.V(2).Infof("Found volume server in topology: %s (disks: %d)", node.Id, len(node.DiskInfos))
+
+ hasVolumes := false
+ // Process each disk on this node
+ for diskType, diskInfo := range node.DiskInfos {
+ if len(diskInfo.VolumeInfos) > 0 {
+ hasVolumes = true
+ glog.V(2).Infof("Volume server %s disk %s has %d volumes", node.Id, diskType, len(diskInfo.VolumeInfos))
+ }
+
+ // Process volumes on this specific disk
for _, volInfo := range diskInfo.VolumeInfos {
metric := &VolumeHealthMetrics{
VolumeID: volInfo.Id,
Server: node.Id,
+ DiskType: diskType, // Track which disk this volume is on
+ DiskId: volInfo.DiskId, // Use disk ID from volume info
Collection: volInfo.Collection,
Size: volInfo.Size,
DeletedBytes: volInfo.DeletedByteCount,
@@ -94,31 +140,58 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics,
// Calculate derived metrics
if metric.Size > 0 {
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
- // Calculate fullness ratio (would need volume size limit)
- // metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit)
+ // Calculate fullness ratio using actual volume size limit from master
+ metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes)
}
metric.Age = time.Since(metric.LastModified)
+ glog.V(3).Infof("Volume %d on %s:%s (ID %d): size=%d, limit=%d, fullness=%.2f",
+ metric.VolumeID, metric.Server, metric.DiskType, metric.DiskId, metric.Size, volumeSizeLimitBytes, metric.FullnessRatio)
+
metrics = append(metrics, metric)
}
}
+
+ if hasVolumes {
+ nodesWithVolumes = append(nodesWithVolumes, node.Id)
+ } else {
+ nodesWithoutVolumes = append(nodesWithoutVolumes, node.Id)
+ glog.V(1).Infof("Volume server %s found in topology but has no volumes", node.Id)
+ }
}
}
}
+ glog.Infof("Topology discovery complete:")
+ glog.Infof(" - Total volume servers in topology: %d (%v)", len(allNodesInTopology), allNodesInTopology)
+ glog.Infof(" - Volume servers with volumes: %d (%v)", len(nodesWithVolumes), nodesWithVolumes)
+ glog.Infof(" - Volume servers without volumes: %d (%v)", len(nodesWithoutVolumes), nodesWithoutVolumes)
+ glog.Infof("Note: Maintenance system will track empty servers separately from volume metrics.")
+
+ // Store topology info for volume shard tracker
+ ms.lastTopologyInfo = resp.TopologyInfo
+
return nil
})
if err != nil {
+ glog.Errorf("Failed to get volume health metrics: %v", err)
return nil, err
}
+ glog.V(1).Infof("Successfully collected metrics for %d actual volumes with disk ID information", len(metrics))
+
// Count actual replicas and identify EC volumes
ms.enrichVolumeMetrics(metrics)
return metrics, nil
}
+// getTopologyInfo returns the last collected topology information
+func (ms *MaintenanceScanner) getTopologyInfo() *master_pb.TopologyInfo {
+ return ms.lastTopologyInfo
+}
+
// enrichVolumeMetrics adds additional information like replica counts
func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
// Group volumes by ID to count replicas
@@ -127,13 +200,17 @@ func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
- // Update replica counts
- for _, group := range volumeGroups {
- actualReplicas := len(group)
- for _, metric := range group {
- metric.ReplicaCount = actualReplicas
+ // Update replica counts for actual volumes
+ for volumeID, replicas := range volumeGroups {
+ replicaCount := len(replicas)
+ for _, replica := range replicas {
+ replica.ReplicaCount = replicaCount
}
+ glog.V(3).Infof("Volume %d has %d replicas", volumeID, replicaCount)
}
+
+ // TODO: Identify EC volumes by checking volume structure
+ // This would require querying volume servers for EC shard information
}
// convertToTaskMetrics converts existing volume metrics to task system format
@@ -144,6 +221,8 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric
simplified = append(simplified, &types.VolumeHealthMetrics{
VolumeID: metric.VolumeID,
Server: metric.Server,
+ DiskType: metric.DiskType,
+ DiskId: metric.DiskId,
Collection: metric.Collection,
Size: metric.Size,
DeletedBytes: metric.DeletedBytes,
@@ -159,5 +238,6 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric
})
}
+ glog.V(2).Infof("Converted %d volume metrics with disk ID information for task detection", len(simplified))
return simplified
}