aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/vacuum/detection.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/vacuum/detection.go')
-rw-r--r--weed/worker/tasks/vacuum/detection.go65
1 files changed, 33 insertions, 32 deletions
diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go
index 7b5a1baf0..23f82ad34 100644
--- a/weed/worker/tasks/vacuum/detection.go
+++ b/weed/worker/tasks/vacuum/detection.go
@@ -4,6 +4,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -39,6 +40,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
Reason: "Volume has excessive garbage requiring vacuum",
ScheduleAt: time.Now(),
}
+
+ // Create typed parameters for vacuum task
+ result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig)
results = append(results, result)
} else {
// Debug why volume was not selected
@@ -74,39 +78,36 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
return results, nil
}
-// Scheduling implements the scheduling logic for vacuum tasks
-func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
- vacuumConfig := config.(*Config)
-
- // Count running vacuum tasks
- runningVacuumCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeVacuum {
- runningVacuumCount++
- }
- }
-
- // Check concurrency limit
- if runningVacuumCount >= vacuumConfig.MaxConcurrent {
- return false
+// createVacuumTaskParams creates typed parameters for vacuum tasks
+// This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic
+func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config) *worker_pb.TaskParams {
+ // Use configured values or defaults
+ garbageThreshold := 0.3 // Default 30%
+ verifyChecksum := true // Default to verify
+ batchSize := int32(1000) // Default batch size
+ workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory
+
+ if vacuumConfig != nil {
+ garbageThreshold = vacuumConfig.GarbageThreshold
+ // Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds
+ // Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added
+ // to the protobuf definition if they should be configurable
}
- // Check for available workers with vacuum capability
- for _, worker := range availableWorkers {
- if worker.CurrentLoad < worker.MaxConcurrent {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeVacuum {
- return true
- }
- }
- }
+ // Create typed protobuf parameters
+ return &worker_pb.TaskParams{
+ VolumeId: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ VolumeSize: metric.Size, // Store original volume size for tracking changes
+ TaskParams: &worker_pb.TaskParams_VacuumParams{
+ VacuumParams: &worker_pb.VacuumTaskParams{
+ GarbageThreshold: garbageThreshold,
+ ForceVacuum: false,
+ BatchSize: batchSize,
+ WorkingDir: workingDir,
+ VerifyChecksum: verifyChecksum,
+ },
+ },
}
-
- return false
-}
-
-// CreateTask creates a new vacuum task instance
-func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
- // Create and return the vacuum task using existing Task type
- return NewTask(params.Server, params.VolumeID), nil
}