aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_scanner.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/maintenance/maintenance_scanner.go')
-rw-r--r--weed/admin/maintenance/maintenance_scanner.go163
1 files changed, 163 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_scanner.go b/weed/admin/maintenance/maintenance_scanner.go
new file mode 100644
index 000000000..4d7cda125
--- /dev/null
+++ b/weed/admin/maintenance/maintenance_scanner.go
@@ -0,0 +1,163 @@
+package maintenance
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// NewMaintenanceScanner creates a new maintenance scanner
+func NewMaintenanceScanner(adminClient AdminClient, policy *MaintenancePolicy, queue *MaintenanceQueue) *MaintenanceScanner {
+ scanner := &MaintenanceScanner{
+ adminClient: adminClient,
+ policy: policy,
+ queue: queue,
+ lastScan: make(map[MaintenanceTaskType]time.Time),
+ }
+
+ // Initialize integration
+ scanner.integration = NewMaintenanceIntegration(queue, policy)
+
+ // Set up bidirectional relationship
+ queue.SetIntegration(scanner.integration)
+
+ glog.V(1).Infof("Initialized maintenance scanner with task system")
+
+ return scanner
+}
+
+// ScanForMaintenanceTasks analyzes the cluster and generates maintenance tasks
+func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, error) {
+ // Get volume health metrics
+ volumeMetrics, err := ms.getVolumeHealthMetrics()
+ if err != nil {
+ return nil, fmt.Errorf("failed to get volume health metrics: %v", err)
+ }
+
+ // Use task system for all task types
+ if ms.integration != nil {
+ // Convert metrics to task system format
+ taskMetrics := ms.convertToTaskMetrics(volumeMetrics)
+
+ // Use task detection system
+ results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
+ if err != nil {
+ glog.Errorf("Task scanning failed: %v", err)
+ return nil, err
+ }
+
+ glog.V(1).Infof("Maintenance scan completed: found %d tasks", len(results))
+ return results, nil
+ }
+
+ // No integration available
+ glog.Warningf("No integration available, no tasks will be scheduled")
+ return []*TaskDetectionResult{}, nil
+}
+
+// getVolumeHealthMetrics collects health information for all volumes
+func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
+ var metrics []*VolumeHealthMetrics
+
+ err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
+ resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ if err != nil {
+ return err
+ }
+
+ if resp.TopologyInfo == nil {
+ return nil
+ }
+
+ for _, dc := range resp.TopologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, node := range rack.DataNodeInfos {
+ for _, diskInfo := range node.DiskInfos {
+ for _, volInfo := range diskInfo.VolumeInfos {
+ metric := &VolumeHealthMetrics{
+ VolumeID: volInfo.Id,
+ Server: node.Id,
+ Collection: volInfo.Collection,
+ Size: volInfo.Size,
+ DeletedBytes: volInfo.DeletedByteCount,
+ LastModified: time.Unix(int64(volInfo.ModifiedAtSecond), 0),
+ IsReadOnly: volInfo.ReadOnly,
+ IsECVolume: false, // Will be determined from volume structure
+ ReplicaCount: 1, // Will be counted
+ ExpectedReplicas: int(volInfo.ReplicaPlacement),
+ }
+
+ // Calculate derived metrics
+ if metric.Size > 0 {
+ metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
+ // Calculate fullness ratio (would need volume size limit)
+ // metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit)
+ }
+ metric.Age = time.Since(metric.LastModified)
+
+ metrics = append(metrics, metric)
+ }
+ }
+ }
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ // Count actual replicas and identify EC volumes
+ ms.enrichVolumeMetrics(metrics)
+
+ return metrics, nil
+}
+
+// enrichVolumeMetrics adds additional information like replica counts
+func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
+ // Group volumes by ID to count replicas
+ volumeGroups := make(map[uint32][]*VolumeHealthMetrics)
+ for _, metric := range metrics {
+ volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
+ }
+
+ // Update replica counts
+ for _, group := range volumeGroups {
+ actualReplicas := len(group)
+ for _, metric := range group {
+ metric.ReplicaCount = actualReplicas
+ }
+ }
+}
+
+// convertToTaskMetrics converts existing volume metrics to task system format
+func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetrics) []*types.VolumeHealthMetrics {
+ var simplified []*types.VolumeHealthMetrics
+
+ for _, metric := range metrics {
+ simplified = append(simplified, &types.VolumeHealthMetrics{
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Size: metric.Size,
+ DeletedBytes: metric.DeletedBytes,
+ GarbageRatio: metric.GarbageRatio,
+ LastModified: metric.LastModified,
+ Age: metric.Age,
+ ReplicaCount: metric.ReplicaCount,
+ ExpectedReplicas: metric.ExpectedReplicas,
+ IsReadOnly: metric.IsReadOnly,
+ HasRemoteCopy: metric.HasRemoteCopy,
+ IsECVolume: metric.IsECVolume,
+ FullnessRatio: metric.FullnessRatio,
+ })
+ }
+
+ return simplified
+}