diff options
Diffstat (limited to 'weed/admin/maintenance/maintenance_scanner.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_scanner.go | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_scanner.go b/weed/admin/maintenance/maintenance_scanner.go new file mode 100644 index 000000000..4d7cda125 --- /dev/null +++ b/weed/admin/maintenance/maintenance_scanner.go @@ -0,0 +1,163 @@ +package maintenance + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// NewMaintenanceScanner creates a new maintenance scanner +func NewMaintenanceScanner(adminClient AdminClient, policy *MaintenancePolicy, queue *MaintenanceQueue) *MaintenanceScanner { + scanner := &MaintenanceScanner{ + adminClient: adminClient, + policy: policy, + queue: queue, + lastScan: make(map[MaintenanceTaskType]time.Time), + } + + // Initialize integration + scanner.integration = NewMaintenanceIntegration(queue, policy) + + // Set up bidirectional relationship + queue.SetIntegration(scanner.integration) + + glog.V(1).Infof("Initialized maintenance scanner with task system") + + return scanner +} + +// ScanForMaintenanceTasks analyzes the cluster and generates maintenance tasks +func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, error) { + // Get volume health metrics + volumeMetrics, err := ms.getVolumeHealthMetrics() + if err != nil { + return nil, fmt.Errorf("failed to get volume health metrics: %v", err) + } + + // Use task system for all task types + if ms.integration != nil { + // Convert metrics to task system format + taskMetrics := ms.convertToTaskMetrics(volumeMetrics) + + // Use task detection system + results, err := ms.integration.ScanWithTaskDetectors(taskMetrics) + if err != nil { + glog.Errorf("Task scanning failed: %v", err) + return nil, err + } + + glog.V(1).Infof("Maintenance scan completed: found %d tasks", len(results)) + return results, nil + } + + // No integration available + glog.Warningf("No integration available, no tasks will be scheduled") + return []*TaskDetectionResult{}, nil +} + +// getVolumeHealthMetrics collects health information for all volumes +func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) { + var metrics []*VolumeHealthMetrics + + err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo == nil { + return nil + } + + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + for _, volInfo := range diskInfo.VolumeInfos { + metric := &VolumeHealthMetrics{ + VolumeID: volInfo.Id, + Server: node.Id, + Collection: volInfo.Collection, + Size: volInfo.Size, + DeletedBytes: volInfo.DeletedByteCount, + LastModified: time.Unix(int64(volInfo.ModifiedAtSecond), 0), + IsReadOnly: volInfo.ReadOnly, + IsECVolume: false, // Will be determined from volume structure + ReplicaCount: 1, // Will be counted + ExpectedReplicas: int(volInfo.ReplicaPlacement), + } + + // Calculate derived metrics + if metric.Size > 0 { + metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size) + // Calculate fullness ratio (would need volume size limit) + // metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit) + } + metric.Age = time.Since(metric.LastModified) + + metrics = append(metrics, metric) + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Count actual replicas and identify EC volumes + ms.enrichVolumeMetrics(metrics) + + return metrics, nil +} + +// enrichVolumeMetrics adds additional information like replica counts +func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) { + // Group volumes by ID to count replicas + volumeGroups := make(map[uint32][]*VolumeHealthMetrics) + for _, metric := range metrics { + volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric) + } + + // Update replica counts + for _, group := range volumeGroups { + actualReplicas := len(group) + for _, metric := range group { + metric.ReplicaCount = actualReplicas + } + } +} + +// convertToTaskMetrics converts existing volume metrics to task system format +func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetrics) []*types.VolumeHealthMetrics { + var simplified []*types.VolumeHealthMetrics + + for _, metric := range metrics { + simplified = append(simplified, &types.VolumeHealthMetrics{ + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Size: metric.Size, + DeletedBytes: metric.DeletedBytes, + GarbageRatio: metric.GarbageRatio, + LastModified: metric.LastModified, + Age: metric.Age, + ReplicaCount: metric.ReplicaCount, + ExpectedReplicas: metric.ExpectedReplicas, + IsReadOnly: metric.IsReadOnly, + HasRemoteCopy: metric.HasRemoteCopy, + IsECVolume: metric.IsECVolume, + FullnessRatio: metric.FullnessRatio, + }) + } + + return simplified +} |
