diff options
Diffstat (limited to 'weed/worker/tasks/vacuum/detection.go')
| -rw-r--r-- | weed/worker/tasks/vacuum/detection.go | 65 |
1 files changed, 33 insertions, 32 deletions
diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index 7b5a1baf0..23f82ad34 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -4,6 +4,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -39,6 +40,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI Reason: "Volume has excessive garbage requiring vacuum", ScheduleAt: time.Now(), } + + // Create typed parameters for vacuum task + result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig) results = append(results, result) } else { // Debug why volume was not selected @@ -74,39 +78,36 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI return results, nil } -// Scheduling implements the scheduling logic for vacuum tasks -func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - vacuumConfig := config.(*Config) - - // Count running vacuum tasks - runningVacuumCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeVacuum { - runningVacuumCount++ - } - } - - // Check concurrency limit - if runningVacuumCount >= vacuumConfig.MaxConcurrent { - return false +// createVacuumTaskParams creates typed parameters for vacuum tasks +// This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic +func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config) *worker_pb.TaskParams { + // Use configured values or defaults + garbageThreshold := 0.3 // Default 30% + verifyChecksum := true // Default to verify + batchSize := int32(1000) // Default batch size + workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory + + if vacuumConfig != nil { + garbageThreshold = vacuumConfig.GarbageThreshold + // Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds + // Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added + // to the protobuf definition if they should be configurable } - // Check for available workers with vacuum capability - for _, worker := range availableWorkers { - if worker.CurrentLoad < worker.MaxConcurrent { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeVacuum { - return true - } - } - } + // Create typed protobuf parameters + return &worker_pb.TaskParams{ + VolumeId: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + VolumeSize: metric.Size, // Store original volume size for tracking changes + TaskParams: &worker_pb.TaskParams_VacuumParams{ + VacuumParams: &worker_pb.VacuumTaskParams{ + GarbageThreshold: garbageThreshold, + ForceVacuum: false, + BatchSize: batchSize, + WorkingDir: workingDir, + VerifyChecksum: verifyChecksum, + }, + }, } - - return false -} - -// CreateTask creates a new vacuum task instance -func CreateTask(params types.TaskParams) (types.TaskInterface, error) { - // Create and return the vacuum task using existing Task type - return NewTask(params.Server, params.VolumeID), nil } |
