1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
package maintenance
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// NewMaintenanceScanner creates a new maintenance scanner
func NewMaintenanceScanner(adminClient AdminClient, policy *MaintenancePolicy, queue *MaintenanceQueue) *MaintenanceScanner {
scanner := &MaintenanceScanner{
adminClient: adminClient,
policy: policy,
queue: queue,
lastScan: make(map[MaintenanceTaskType]time.Time),
}
// Initialize integration
scanner.integration = NewMaintenanceIntegration(queue, policy)
// Set up bidirectional relationship
queue.SetIntegration(scanner.integration)
glog.V(1).Infof("Initialized maintenance scanner with task system")
return scanner
}
// ScanForMaintenanceTasks analyzes the cluster and generates maintenance tasks
func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, error) {
// Get volume health metrics
volumeMetrics, err := ms.getVolumeHealthMetrics()
if err != nil {
return nil, fmt.Errorf("failed to get volume health metrics: %w", err)
}
// Use task system for all task types
if ms.integration != nil {
// Convert metrics to task system format
taskMetrics := ms.convertToTaskMetrics(volumeMetrics)
// Use task detection system
results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
if err != nil {
glog.Errorf("Task scanning failed: %v", err)
return nil, err
}
glog.V(1).Infof("Maintenance scan completed: found %d tasks", len(results))
return results, nil
}
// No integration available
glog.Warningf("No integration available, no tasks will be scheduled")
return []*TaskDetectionResult{}, nil
}
// getVolumeHealthMetrics collects health information for all volumes
func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
var metrics []*VolumeHealthMetrics
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo == nil {
return nil
}
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
for _, volInfo := range diskInfo.VolumeInfos {
metric := &VolumeHealthMetrics{
VolumeID: volInfo.Id,
Server: node.Id,
Collection: volInfo.Collection,
Size: volInfo.Size,
DeletedBytes: volInfo.DeletedByteCount,
LastModified: time.Unix(int64(volInfo.ModifiedAtSecond), 0),
IsReadOnly: volInfo.ReadOnly,
IsECVolume: false, // Will be determined from volume structure
ReplicaCount: 1, // Will be counted
ExpectedReplicas: int(volInfo.ReplicaPlacement),
}
// Calculate derived metrics
if metric.Size > 0 {
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
// Calculate fullness ratio (would need volume size limit)
// metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit)
}
metric.Age = time.Since(metric.LastModified)
metrics = append(metrics, metric)
}
}
}
}
}
return nil
})
if err != nil {
return nil, err
}
// Count actual replicas and identify EC volumes
ms.enrichVolumeMetrics(metrics)
return metrics, nil
}
// enrichVolumeMetrics adds additional information like replica counts
func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
// Group volumes by ID to count replicas
volumeGroups := make(map[uint32][]*VolumeHealthMetrics)
for _, metric := range metrics {
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
// Update replica counts
for _, group := range volumeGroups {
actualReplicas := len(group)
for _, metric := range group {
metric.ReplicaCount = actualReplicas
}
}
}
// convertToTaskMetrics converts existing volume metrics to task system format
func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetrics) []*types.VolumeHealthMetrics {
var simplified []*types.VolumeHealthMetrics
for _, metric := range metrics {
simplified = append(simplified, &types.VolumeHealthMetrics{
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Size: metric.Size,
DeletedBytes: metric.DeletedBytes,
GarbageRatio: metric.GarbageRatio,
LastModified: metric.LastModified,
Age: metric.Age,
ReplicaCount: metric.ReplicaCount,
ExpectedReplicas: metric.ExpectedReplicas,
IsReadOnly: metric.IsReadOnly,
HasRemoteCopy: metric.HasRemoteCopy,
IsECVolume: metric.IsECVolume,
FullnessRatio: metric.FullnessRatio,
})
}
return simplified
}
|