aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/vacuum/detection.go
blob: bd86a2742ad721fd4e0bfe22a39c7fdd9b7a1e61 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package vacuum

import (
	"fmt"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
	"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
	"github.com/seaweedfs/seaweedfs/weed/worker/types"
)

// Detection implements the detection logic for vacuum tasks
func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
	if !config.IsEnabled() {
		return nil, nil
	}

	vacuumConfig := config.(*Config)
	var results []*types.TaskDetectionResult
	minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second

	debugCount := 0
	skippedDueToGarbage := 0
	skippedDueToAge := 0

	for _, metric := range metrics {
		// Check if volume needs vacuum
		if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
			priority := types.TaskPriorityNormal
			if metric.GarbageRatio > 0.6 {
				priority = types.TaskPriorityHigh
			}

			// Generate task ID for future ActiveTopology integration
			taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix())

			result := &types.TaskDetectionResult{
				TaskID:     taskID, // For future ActiveTopology integration
				TaskType:   types.TaskTypeVacuum,
				VolumeID:   metric.VolumeID,
				Server:     metric.Server,
				Collection: metric.Collection,
				Priority:   priority,
				Reason:     "Volume has excessive garbage requiring vacuum",
				ScheduleAt: time.Now(),
			}

			// Create typed parameters for vacuum task
			result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo)
			results = append(results, result)
		} else {
			// Debug why volume was not selected
			if debugCount < 5 { // Limit debug output to first 5 volumes
				if metric.GarbageRatio < vacuumConfig.GarbageThreshold {
					skippedDueToGarbage++
				}
				if metric.Age < minVolumeAge {
					skippedDueToAge++
				}
			}
			debugCount++
		}
	}

	// Log debug summary if no tasks were created
	if len(results) == 0 && len(metrics) > 0 {
		totalVolumes := len(metrics)
		glog.Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage<threshold), %d (age<minimum)",
			totalVolumes, vacuumConfig.GarbageThreshold*100, minVolumeAge, skippedDueToGarbage, skippedDueToAge)

		// Show details for first few volumes
		for i, metric := range metrics {
			if i >= 3 { // Limit to first 3 volumes
				break
			}
			glog.Infof("VACUUM: Volume %d: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)",
				metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100,
				metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute))
		}
	}

	return results, nil
}

// createVacuumTaskParams creates typed parameters for vacuum tasks
// This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic
func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams {
	// Use configured values or defaults
	garbageThreshold := 0.3                    // Default 30%
	verifyChecksum := true                     // Default to verify
	batchSize := int32(1000)                   // Default batch size
	workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory

	if vacuumConfig != nil {
		garbageThreshold = vacuumConfig.GarbageThreshold
		// Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds
		// Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added
		// to the protobuf definition if they should be configurable
	}

	// Use DC and rack information directly from VolumeHealthMetrics
	sourceDC, sourceRack := metric.DataCenter, metric.Rack

	// Create typed protobuf parameters with unified sources
	return &worker_pb.TaskParams{
		TaskId:     task.TaskID, // Link to ActiveTopology pending task (if integrated)
		VolumeId:   task.VolumeID,
		Collection: task.Collection,
		VolumeSize: metric.Size, // Store original volume size for tracking changes

		// Unified sources array
		Sources: []*worker_pb.TaskSource{
			{
				Node:          task.Server,
				VolumeId:      task.VolumeID,
				EstimatedSize: metric.Size,
				DataCenter:    sourceDC,
				Rack:          sourceRack,
			},
		},

		TaskParams: &worker_pb.TaskParams_VacuumParams{
			VacuumParams: &worker_pb.VacuumTaskParams{
				GarbageThreshold: garbageThreshold,
				ForceVacuum:      false,
				BatchSize:        batchSize,
				WorkingDir:       workingDir,
				VerifyChecksum:   verifyChecksum,
			},
		},
	}
}