aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks')
-rw-r--r--weed/worker/tasks/balance/balance.go65
-rw-r--r--weed/worker/tasks/balance/balance_detector.go171
-rw-r--r--weed/worker/tasks/balance/balance_register.go109
-rw-r--r--weed/worker/tasks/balance/balance_scheduler.go197
-rw-r--r--weed/worker/tasks/balance/balance_typed.go156
-rw-r--r--weed/worker/tasks/balance/config.go170
-rw-r--r--weed/worker/tasks/balance/detection.go134
-rw-r--r--weed/worker/tasks/balance/ui.go361
-rw-r--r--weed/worker/tasks/base/generic_components.go129
-rw-r--r--weed/worker/tasks/base/registration.go155
-rw-r--r--weed/worker/tasks/base/task_definition.go272
-rw-r--r--weed/worker/tasks/base/task_definition_test.go338
-rw-r--r--weed/worker/tasks/base/typed_task.go218
-rw-r--r--weed/worker/tasks/config_update_registry.go67
-rw-r--r--weed/worker/tasks/erasure_coding/config.go207
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go140
-rw-r--r--weed/worker/tasks/erasure_coding/ec.go792
-rw-r--r--weed/worker/tasks/erasure_coding/ec_detector.go139
-rw-r--r--weed/worker/tasks/erasure_coding/ec_register.go109
-rw-r--r--weed/worker/tasks/erasure_coding/ec_scheduler.go114
-rw-r--r--weed/worker/tasks/erasure_coding/ui.go309
-rw-r--r--weed/worker/tasks/schema_provider.go51
-rw-r--r--weed/worker/tasks/task.go198
-rw-r--r--weed/worker/tasks/task_log_handler.go230
-rw-r--r--weed/worker/tasks/task_logger.go432
-rw-r--r--weed/worker/tasks/ui_base.go184
-rw-r--r--weed/worker/tasks/vacuum/config.go190
-rw-r--r--weed/worker/tasks/vacuum/detection.go112
-rw-r--r--weed/worker/tasks/vacuum/ui.go314
-rw-r--r--weed/worker/tasks/vacuum/vacuum.go195
-rw-r--r--weed/worker/tasks/vacuum/vacuum_detector.go132
-rw-r--r--weed/worker/tasks/vacuum/vacuum_register.go109
-rw-r--r--weed/worker/tasks/vacuum/vacuum_scheduler.go111
33 files changed, 4503 insertions, 2107 deletions
diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go
index ea867d950..0becb3415 100644
--- a/weed/worker/tasks/balance/balance.go
+++ b/weed/worker/tasks/balance/balance.go
@@ -1,6 +1,7 @@
package balance
import (
+ "context"
"fmt"
"time"
@@ -15,6 +16,9 @@ type Task struct {
server string
volumeID uint32
collection string
+
+ // Task parameters for accessing planned destinations
+ taskParams types.TaskParams
}
// NewTask creates a new balance task instance
@@ -30,7 +34,31 @@ func NewTask(server string, volumeID uint32, collection string) *Task {
// Execute executes the balance task
func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection)
+ // Use BaseTask.ExecuteTask to handle logging initialization
+ return t.ExecuteTask(context.Background(), params, t.executeImpl)
+}
+
+// executeImpl is the actual balance implementation
+func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error {
+ // Store task parameters for accessing planned destinations
+ t.taskParams = params
+
+ // Get planned destination
+ destNode := t.getPlannedDestination()
+ if destNode != "" {
+ t.LogWithFields("INFO", "Starting balance task with planned destination", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "source": t.server,
+ "destination": destNode,
+ "collection": t.collection,
+ })
+ } else {
+ t.LogWithFields("INFO", "Starting balance task without specific destination", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ })
+ }
// Simulate balance operation with progress updates
steps := []struct {
@@ -46,18 +74,36 @@ func (t *Task) Execute(params types.TaskParams) error {
}
for _, step := range steps {
+ select {
+ case <-ctx.Done():
+ t.LogWarning("Balance task cancelled during step: %s", step.name)
+ return ctx.Err()
+ default:
+ }
+
if t.IsCancelled() {
+ t.LogWarning("Balance task cancelled by request during step: %s", step.name)
return fmt.Errorf("balance task cancelled")
}
- glog.V(1).Infof("Balance task step: %s", step.name)
+ t.LogWithFields("INFO", "Executing balance step", map[string]interface{}{
+ "step": step.name,
+ "progress": step.progress,
+ "duration": step.duration.String(),
+ "volume_id": t.volumeID,
+ })
t.SetProgress(step.progress)
// Simulate work
time.Sleep(step.duration)
}
- glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server)
+ t.LogWithFields("INFO", "Balance task completed successfully", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ "final_progress": 100.0,
+ })
return nil
}
@@ -72,6 +118,19 @@ func (t *Task) Validate(params types.TaskParams) error {
return nil
}
+// getPlannedDestination extracts the planned destination node from task parameters
+func (t *Task) getPlannedDestination() string {
+ if t.taskParams.TypedParams != nil {
+ if balanceParams := t.taskParams.TypedParams.GetBalanceParams(); balanceParams != nil {
+ if balanceParams.DestNode != "" {
+ glog.V(2).Infof("Found planned destination for volume %d: %s", t.volumeID, balanceParams.DestNode)
+ return balanceParams.DestNode
+ }
+ }
+ }
+ return ""
+}
+
// EstimateTime estimates the time needed for the task
func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
// Base time for balance operation
diff --git a/weed/worker/tasks/balance/balance_detector.go b/weed/worker/tasks/balance/balance_detector.go
deleted file mode 100644
index f082b7a77..000000000
--- a/weed/worker/tasks/balance/balance_detector.go
+++ /dev/null
@@ -1,171 +0,0 @@
-package balance
-
-import (
- "fmt"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// BalanceDetector implements TaskDetector for balance tasks
-type BalanceDetector struct {
- enabled bool
- threshold float64 // Imbalance threshold (0.1 = 10%)
- minCheckInterval time.Duration
- minVolumeCount int
- lastCheck time.Time
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*BalanceDetector)(nil)
-)
-
-// NewBalanceDetector creates a new balance detector
-func NewBalanceDetector() *BalanceDetector {
- return &BalanceDetector{
- enabled: true,
- threshold: 0.1, // 10% imbalance threshold
- minCheckInterval: 1 * time.Hour,
- minVolumeCount: 10, // Don't balance small clusters
- lastCheck: time.Time{},
- }
-}
-
-// GetTaskType returns the task type
-func (d *BalanceDetector) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// ScanForTasks checks if cluster balance is needed
-func (d *BalanceDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- glog.V(2).Infof("Scanning for balance tasks...")
-
- // Don't check too frequently
- if time.Since(d.lastCheck) < d.minCheckInterval {
- return nil, nil
- }
- d.lastCheck = time.Now()
-
- // Skip if cluster is too small
- if len(volumeMetrics) < d.minVolumeCount {
- glog.V(2).Infof("Cluster too small for balance (%d volumes < %d minimum)", len(volumeMetrics), d.minVolumeCount)
- return nil, nil
- }
-
- // Analyze volume distribution across servers
- serverVolumeCounts := make(map[string]int)
- for _, metric := range volumeMetrics {
- serverVolumeCounts[metric.Server]++
- }
-
- if len(serverVolumeCounts) < 2 {
- glog.V(2).Infof("Not enough servers for balance (%d servers)", len(serverVolumeCounts))
- return nil, nil
- }
-
- // Calculate balance metrics
- totalVolumes := len(volumeMetrics)
- avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
-
- maxVolumes := 0
- minVolumes := totalVolumes
- maxServer := ""
- minServer := ""
-
- for server, count := range serverVolumeCounts {
- if count > maxVolumes {
- maxVolumes = count
- maxServer = server
- }
- if count < minVolumes {
- minVolumes = count
- minServer = server
- }
- }
-
- // Check if imbalance exceeds threshold
- imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
- if imbalanceRatio <= d.threshold {
- glog.V(2).Infof("Cluster is balanced (imbalance ratio: %.2f <= %.2f)", imbalanceRatio, d.threshold)
- return nil, nil
- }
-
- // Create balance task
- reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
- imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
-
- task := &types.TaskDetectionResult{
- TaskType: types.TaskTypeBalance,
- Priority: types.TaskPriorityNormal,
- Reason: reason,
- ScheduleAt: time.Now(),
- Parameters: map[string]interface{}{
- "imbalance_ratio": imbalanceRatio,
- "threshold": d.threshold,
- "max_volumes": maxVolumes,
- "min_volumes": minVolumes,
- "avg_volumes_per_server": avgVolumesPerServer,
- "max_server": maxServer,
- "min_server": minServer,
- "total_servers": len(serverVolumeCounts),
- },
- }
-
- glog.V(1).Infof("πŸ”„ Found balance task: %s", reason)
- return []*types.TaskDetectionResult{task}, nil
-}
-
-// ScanInterval returns how often to scan
-func (d *BalanceDetector) ScanInterval() time.Duration {
- return d.minCheckInterval
-}
-
-// IsEnabled returns whether the detector is enabled
-func (d *BalanceDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// SetEnabled sets whether the detector is enabled
-func (d *BalanceDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
- glog.V(1).Infof("πŸ”„ Balance detector enabled: %v", enabled)
-}
-
-// SetThreshold sets the imbalance threshold
-func (d *BalanceDetector) SetThreshold(threshold float64) {
- d.threshold = threshold
- glog.V(1).Infof("πŸ”„ Balance threshold set to: %.1f%%", threshold*100)
-}
-
-// SetMinCheckInterval sets the minimum time between balance checks
-func (d *BalanceDetector) SetMinCheckInterval(interval time.Duration) {
- d.minCheckInterval = interval
- glog.V(1).Infof("πŸ”„ Balance check interval set to: %v", interval)
-}
-
-// SetMinVolumeCount sets the minimum volume count for balance operations
-func (d *BalanceDetector) SetMinVolumeCount(count int) {
- d.minVolumeCount = count
- glog.V(1).Infof("πŸ”„ Balance minimum volume count set to: %d", count)
-}
-
-// GetThreshold returns the current imbalance threshold
-func (d *BalanceDetector) GetThreshold() float64 {
- return d.threshold
-}
-
-// GetMinCheckInterval returns the minimum check interval
-func (d *BalanceDetector) GetMinCheckInterval() time.Duration {
- return d.minCheckInterval
-}
-
-// GetMinVolumeCount returns the minimum volume count
-func (d *BalanceDetector) GetMinVolumeCount() int {
- return d.minVolumeCount
-}
diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go
index 7c2d5a520..b26a40782 100644
--- a/weed/worker/tasks/balance/balance_register.go
+++ b/weed/worker/tasks/balance/balance_register.go
@@ -2,80 +2,71 @@ package balance
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates balance task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new balance task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeBalance,
- []string{"balance", "storage", "optimization"},
- "Balance data across volume servers for optimal performance",
- ),
- }
-}
-
-// Create creates a new balance task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID, params.Collection)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterBalanceTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeBalance, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *BalanceDetector
- sharedScheduler *BalanceScheduler
-)
+// RegisterBalanceTask registers the balance task with the new architecture
+func RegisterBalanceTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- if sharedDetector == nil {
- sharedDetector = NewBalanceDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewBalanceScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeBalance,
+ Name: "balance",
+ DisplayName: "Volume Balance",
+ Description: "Balances volume distribution across servers",
+ Icon: "fas fa-balance-scale text-warning",
+ Capabilities: []string{"balance", "distribution"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 30 * time.Minute,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 2 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeBalance, factory)
+// UpdateConfigFromPersistence updates the balance configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("balance task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated balance task configuration from persistence")
+ return nil
}
diff --git a/weed/worker/tasks/balance/balance_scheduler.go b/weed/worker/tasks/balance/balance_scheduler.go
deleted file mode 100644
index a8fefe465..000000000
--- a/weed/worker/tasks/balance/balance_scheduler.go
+++ /dev/null
@@ -1,197 +0,0 @@
-package balance
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// BalanceScheduler implements TaskScheduler for balance tasks
-type BalanceScheduler struct {
- enabled bool
- maxConcurrent int
- minInterval time.Duration
- lastScheduled map[string]time.Time // track when we last scheduled a balance for each task type
- minServerCount int
- moveDuringOffHours bool
- offHoursStart string
- offHoursEnd string
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskScheduler = (*BalanceScheduler)(nil)
-)
-
-// NewBalanceScheduler creates a new balance scheduler
-func NewBalanceScheduler() *BalanceScheduler {
- return &BalanceScheduler{
- enabled: true,
- maxConcurrent: 1, // Only run one balance at a time
- minInterval: 6 * time.Hour,
- lastScheduled: make(map[string]time.Time),
- minServerCount: 3,
- moveDuringOffHours: true,
- offHoursStart: "23:00",
- offHoursEnd: "06:00",
- }
-}
-
-// GetTaskType returns the task type
-func (s *BalanceScheduler) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// CanScheduleNow determines if a balance task can be scheduled
-func (s *BalanceScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- if !s.enabled {
- return false
- }
-
- // Count running balance tasks
- runningBalanceCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeBalance {
- runningBalanceCount++
- }
- }
-
- // Check concurrency limit
- if runningBalanceCount >= s.maxConcurrent {
- glog.V(3).Infof("⏸️ Balance task blocked: too many running (%d >= %d)", runningBalanceCount, s.maxConcurrent)
- return false
- }
-
- // Check minimum interval between balance operations
- if lastTime, exists := s.lastScheduled["balance"]; exists {
- if time.Since(lastTime) < s.minInterval {
- timeLeft := s.minInterval - time.Since(lastTime)
- glog.V(3).Infof("⏸️ Balance task blocked: too soon (wait %v)", timeLeft)
- return false
- }
- }
-
- // Check if we have available workers
- availableWorkerCount := 0
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeBalance {
- availableWorkerCount++
- break
- }
- }
- }
-
- if availableWorkerCount == 0 {
- glog.V(3).Infof("⏸️ Balance task blocked: no available workers")
- return false
- }
-
- // All checks passed - can schedule
- s.lastScheduled["balance"] = time.Now()
- glog.V(2).Infof("βœ… Balance task can be scheduled (running: %d/%d, workers: %d)",
- runningBalanceCount, s.maxConcurrent, availableWorkerCount)
- return true
-}
-
-// GetPriority returns the priority for balance tasks
-func (s *BalanceScheduler) GetPriority(task *types.Task) types.TaskPriority {
- // Balance is typically normal priority - not urgent but important for optimization
- return types.TaskPriorityNormal
-}
-
-// GetMaxConcurrent returns the maximum concurrent balance tasks
-func (s *BalanceScheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating balance tasks
-func (s *BalanceScheduler) GetDefaultRepeatInterval() time.Duration {
- return s.minInterval
-}
-
-// IsEnabled returns whether the scheduler is enabled
-func (s *BalanceScheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// SetEnabled sets whether the scheduler is enabled
-func (s *BalanceScheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
- glog.V(1).Infof("πŸ”„ Balance scheduler enabled: %v", enabled)
-}
-
-// SetMaxConcurrent sets the maximum concurrent balance tasks
-func (s *BalanceScheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
- glog.V(1).Infof("πŸ”„ Balance max concurrent set to: %d", max)
-}
-
-// SetMinInterval sets the minimum interval between balance operations
-func (s *BalanceScheduler) SetMinInterval(interval time.Duration) {
- s.minInterval = interval
- glog.V(1).Infof("πŸ”„ Balance minimum interval set to: %v", interval)
-}
-
-// GetLastScheduled returns when we last scheduled this task type
-func (s *BalanceScheduler) GetLastScheduled(taskKey string) time.Time {
- if lastTime, exists := s.lastScheduled[taskKey]; exists {
- return lastTime
- }
- return time.Time{}
-}
-
-// SetLastScheduled updates when we last scheduled this task type
-func (s *BalanceScheduler) SetLastScheduled(taskKey string, when time.Time) {
- s.lastScheduled[taskKey] = when
-}
-
-// GetMinServerCount returns the minimum server count
-func (s *BalanceScheduler) GetMinServerCount() int {
- return s.minServerCount
-}
-
-// SetMinServerCount sets the minimum server count
-func (s *BalanceScheduler) SetMinServerCount(count int) {
- s.minServerCount = count
- glog.V(1).Infof("πŸ”„ Balance minimum server count set to: %d", count)
-}
-
-// GetMoveDuringOffHours returns whether to move only during off-hours
-func (s *BalanceScheduler) GetMoveDuringOffHours() bool {
- return s.moveDuringOffHours
-}
-
-// SetMoveDuringOffHours sets whether to move only during off-hours
-func (s *BalanceScheduler) SetMoveDuringOffHours(enabled bool) {
- s.moveDuringOffHours = enabled
- glog.V(1).Infof("πŸ”„ Balance move during off-hours: %v", enabled)
-}
-
-// GetOffHoursStart returns the off-hours start time
-func (s *BalanceScheduler) GetOffHoursStart() string {
- return s.offHoursStart
-}
-
-// SetOffHoursStart sets the off-hours start time
-func (s *BalanceScheduler) SetOffHoursStart(start string) {
- s.offHoursStart = start
- glog.V(1).Infof("πŸ”„ Balance off-hours start time set to: %s", start)
-}
-
-// GetOffHoursEnd returns the off-hours end time
-func (s *BalanceScheduler) GetOffHoursEnd() string {
- return s.offHoursEnd
-}
-
-// SetOffHoursEnd sets the off-hours end time
-func (s *BalanceScheduler) SetOffHoursEnd(end string) {
- s.offHoursEnd = end
- glog.V(1).Infof("πŸ”„ Balance off-hours end time set to: %s", end)
-}
-
-// GetMinInterval returns the minimum interval
-func (s *BalanceScheduler) GetMinInterval() time.Duration {
- return s.minInterval
-}
diff --git a/weed/worker/tasks/balance/balance_typed.go b/weed/worker/tasks/balance/balance_typed.go
new file mode 100644
index 000000000..91cd912f0
--- /dev/null
+++ b/weed/worker/tasks/balance/balance_typed.go
@@ -0,0 +1,156 @@
+package balance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TypedTask implements balance operation with typed protobuf parameters
+type TypedTask struct {
+ *base.BaseTypedTask
+
+ // Task state from protobuf
+ sourceServer string
+ destNode string
+ volumeID uint32
+ collection string
+ estimatedSize uint64
+ placementScore float64
+ forceMove bool
+ timeoutSeconds int32
+ placementConflicts []string
+}
+
+// NewTypedTask creates a new typed balance task
+func NewTypedTask() types.TypedTaskInterface {
+ task := &TypedTask{
+ BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance),
+ }
+ return task
+}
+
+// ValidateTyped validates the typed parameters for balance task
+func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
+ // Basic validation from base class
+ if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
+ return err
+ }
+
+ // Check that we have balance-specific parameters
+ balanceParams := params.GetBalanceParams()
+ if balanceParams == nil {
+ return fmt.Errorf("balance_params is required for balance task")
+ }
+
+ // Validate destination node
+ if balanceParams.DestNode == "" {
+ return fmt.Errorf("dest_node is required for balance task")
+ }
+
+ // Validate estimated size
+ if balanceParams.EstimatedSize == 0 {
+ return fmt.Errorf("estimated_size must be greater than 0")
+ }
+
+ // Validate timeout
+ if balanceParams.TimeoutSeconds <= 0 {
+ return fmt.Errorf("timeout_seconds must be greater than 0")
+ }
+
+ return nil
+}
+
+// EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters
+func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ balanceParams := params.GetBalanceParams()
+ if balanceParams != nil {
+ // Use the timeout from parameters if specified
+ if balanceParams.TimeoutSeconds > 0 {
+ return time.Duration(balanceParams.TimeoutSeconds) * time.Second
+ }
+
+ // Estimate based on volume size (1 minute per GB)
+ if balanceParams.EstimatedSize > 0 {
+ gbSize := balanceParams.EstimatedSize / (1024 * 1024 * 1024)
+ return time.Duration(gbSize) * time.Minute
+ }
+ }
+
+ // Default estimation
+ return 10 * time.Minute
+}
+
+// ExecuteTyped implements the balance operation with typed parameters
+func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
+ // Extract basic parameters
+ t.volumeID = params.VolumeId
+ t.sourceServer = params.Server
+ t.collection = params.Collection
+
+ // Extract balance-specific parameters
+ balanceParams := params.GetBalanceParams()
+ if balanceParams != nil {
+ t.destNode = balanceParams.DestNode
+ t.estimatedSize = balanceParams.EstimatedSize
+ t.placementScore = balanceParams.PlacementScore
+ t.forceMove = balanceParams.ForceMove
+ t.timeoutSeconds = balanceParams.TimeoutSeconds
+ t.placementConflicts = balanceParams.PlacementConflicts
+ }
+
+ glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)",
+ t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize)
+
+ // Log placement information
+ if t.placementScore > 0 {
+ glog.V(1).Infof("Placement score: %.2f", t.placementScore)
+ }
+ if len(t.placementConflicts) > 0 {
+ glog.V(1).Infof("Placement conflicts: %v", t.placementConflicts)
+ if !t.forceMove {
+ return fmt.Errorf("placement conflicts detected and force_move is false: %v", t.placementConflicts)
+ }
+ glog.Warningf("Proceeding with balance despite conflicts (force_move=true): %v", t.placementConflicts)
+ }
+
+ // Simulate balance operation with progress updates
+ steps := []struct {
+ name string
+ duration time.Duration
+ progress float64
+ }{
+ {"Analyzing cluster state", 2 * time.Second, 15},
+ {"Verifying destination capacity", 1 * time.Second, 25},
+ {"Starting volume migration", 1 * time.Second, 35},
+ {"Moving volume data", 6 * time.Second, 75},
+ {"Updating cluster metadata", 2 * time.Second, 95},
+ {"Verifying balance completion", 1 * time.Second, 100},
+ }
+
+ for _, step := range steps {
+ if t.IsCancelled() {
+ return fmt.Errorf("balance task cancelled during: %s", step.name)
+ }
+
+ glog.V(1).Infof("Balance task step: %s", step.name)
+ t.SetProgress(step.progress)
+
+ // Simulate work
+ time.Sleep(step.duration)
+ }
+
+ glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s",
+ t.volumeID, t.sourceServer, t.destNode)
+ return nil
+}
+
+// Register the typed task in the global registry
+func init() {
+ types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask)
+ glog.V(1).Infof("Registered typed balance task")
+}
diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go
new file mode 100644
index 000000000..9303b4b2a
--- /dev/null
+++ b/weed/worker/tasks/balance/config.go
@@ -0,0 +1,170 @@
+package balance
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with balance-specific settings
+type Config struct {
+ base.BaseConfig
+ ImbalanceThreshold float64 `json:"imbalance_threshold"`
+ MinServerCount int `json:"min_server_count"`
+}
+
+// NewDefaultConfig creates a new default balance configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 30 * 60, // 30 minutes
+ MaxConcurrent: 1,
+ },
+ ImbalanceThreshold: 0.2, // 20%
+ MinServerCount: 2,
+ }
+}
+
+// GetConfigSpec returns the configuration schema for balance tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Balance Tasks",
+ Description: "Whether balance tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic balance task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 30 * 60,
+ MinValue: 5 * 60,
+ MaxValue: 2 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volume distribution imbalances",
+ HelpText: "The system will check for volume distribution imbalances at this interval",
+ Placeholder: "30",
+ Unit: config.UnitMinutes,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 3,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of balance tasks that can run simultaneously",
+ HelpText: "Limits the number of balance operations running at the same time",
+ Placeholder: "1 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "imbalance_threshold",
+ JSONName: "imbalance_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.2,
+ MinValue: 0.05,
+ MaxValue: 0.5,
+ Required: true,
+ DisplayName: "Imbalance Threshold",
+ Description: "Minimum imbalance ratio to trigger balancing",
+ HelpText: "Volume distribution imbalances above this threshold will trigger balancing",
+ Placeholder: "0.20 (20%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_server_count",
+ JSONName: "min_server_count",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 2,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Minimum Server Count",
+ Description: "Minimum number of servers required for balancing",
+ HelpText: "Balancing will only occur if there are at least this many servers",
+ Placeholder: "2 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
+func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: float64(c.ImbalanceThreshold),
+ MinServerCount: int32(c.MinServerCount),
+ },
+ },
+ }
+}
+
+// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
+func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+
+ // Set general TaskPolicy fields
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
+
+ // Set balance-specific fields from the task config
+ if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil {
+ c.ImbalanceThreshold = float64(balanceConfig.ImbalanceThreshold)
+ c.MinServerCount = int(balanceConfig.MinServerCount)
+ }
+
+ return nil
+}
+
+// LoadConfigFromPersistence loads configuration from the persistence layer if available
+func LoadConfigFromPersistence(configPersistence interface{}) *Config {
+ config := NewDefaultConfig()
+
+ // Try to load from persistence if available
+ if persistence, ok := configPersistence.(interface {
+ LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error)
+ }); ok {
+ if policy, err := persistence.LoadBalanceTaskPolicy(); err == nil && policy != nil {
+ if err := config.FromTaskPolicy(policy); err == nil {
+ glog.V(1).Infof("Loaded balance configuration from persistence")
+ return config
+ }
+ }
+ }
+
+ glog.V(1).Infof("Using default balance configuration")
+ return config
+}
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go
new file mode 100644
index 000000000..f4bcf3ca3
--- /dev/null
+++ b/weed/worker/tasks/balance/detection.go
@@ -0,0 +1,134 @@
+package balance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for balance tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ balanceConfig := config.(*Config)
+
+ // Skip if cluster is too small
+ minVolumeCount := 2 // More reasonable for small clusters
+ if len(metrics) < minVolumeCount {
+ glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need β‰₯%d)", len(metrics), minVolumeCount)
+ return nil, nil
+ }
+
+ // Analyze volume distribution across servers
+ serverVolumeCounts := make(map[string]int)
+ for _, metric := range metrics {
+ serverVolumeCounts[metric.Server]++
+ }
+
+ if len(serverVolumeCounts) < balanceConfig.MinServerCount {
+ glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need β‰₯%d)", len(serverVolumeCounts), balanceConfig.MinServerCount)
+ return nil, nil
+ }
+
+ // Calculate balance metrics
+ totalVolumes := len(metrics)
+ avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
+
+ maxVolumes := 0
+ minVolumes := totalVolumes
+ maxServer := ""
+ minServer := ""
+
+ for server, count := range serverVolumeCounts {
+ if count > maxVolumes {
+ maxVolumes = count
+ maxServer = server
+ }
+ if count < minVolumes {
+ minVolumes = count
+ minServer = server
+ }
+ }
+
+ // Check if imbalance exceeds threshold
+ imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
+ if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
+ glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
+ imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+ return nil, nil
+ }
+
+ // Select a volume from the overloaded server for balance
+ var selectedVolume *types.VolumeHealthMetrics
+ for _, metric := range metrics {
+ if metric.Server == maxServer {
+ selectedVolume = metric
+ break
+ }
+ }
+
+ if selectedVolume == nil {
+ glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer)
+ return nil, nil
+ }
+
+ // Create balance task with volume and destination planning info
+ reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
+ imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+
+ task := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeBalance,
+ VolumeID: selectedVolume.VolumeID,
+ Server: selectedVolume.Server,
+ Collection: selectedVolume.Collection,
+ Priority: types.TaskPriorityNormal,
+ Reason: reason,
+ ScheduleAt: time.Now(),
+ // TypedParams will be populated by the maintenance integration
+ // with destination planning information
+ }
+
+ return []*types.TaskDetectionResult{task}, nil
+}
+
+// Scheduling implements the scheduling logic for balance tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ balanceConfig := config.(*Config)
+
+ // Count running balance tasks
+ runningBalanceCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeBalance {
+ runningBalanceCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningBalanceCount >= balanceConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ availableWorkerCount := 0
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeBalance {
+ availableWorkerCount++
+ break
+ }
+ }
+ }
+
+ return availableWorkerCount > 0
+}
+
+// CreateTask creates a new balance task instance
+func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Create and return the balance task using existing Task type
+ return NewTask(params.Server, params.VolumeID, params.Collection), nil
+}
diff --git a/weed/worker/tasks/balance/ui.go b/weed/worker/tasks/balance/ui.go
deleted file mode 100644
index 2cea20a76..000000000
--- a/weed/worker/tasks/balance/ui.go
+++ /dev/null
@@ -1,361 +0,0 @@
-package balance
-
-import (
- "fmt"
- "html/template"
- "strconv"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// UIProvider provides the UI for balance task configuration
-type UIProvider struct {
- detector *BalanceDetector
- scheduler *BalanceScheduler
-}
-
-// NewUIProvider creates a new balance UI provider
-func NewUIProvider(detector *BalanceDetector, scheduler *BalanceScheduler) *UIProvider {
- return &UIProvider{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetTaskType returns the task type
-func (ui *UIProvider) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// GetDisplayName returns the human-readable name
-func (ui *UIProvider) GetDisplayName() string {
- return "Volume Balance"
-}
-
-// GetDescription returns a description of what this task does
-func (ui *UIProvider) GetDescription() string {
- return "Redistributes volumes across volume servers to optimize storage utilization and performance"
-}
-
-// GetIcon returns the icon CSS class for this task type
-func (ui *UIProvider) GetIcon() string {
- return "fas fa-balance-scale text-secondary"
-}
-
-// BalanceConfig represents the balance configuration
-type BalanceConfig struct {
- Enabled bool `json:"enabled"`
- ImbalanceThreshold float64 `json:"imbalance_threshold"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- MinServerCount int `json:"min_server_count"`
- MoveDuringOffHours bool `json:"move_during_off_hours"`
- OffHoursStart string `json:"off_hours_start"`
- OffHoursEnd string `json:"off_hours_end"`
- MinIntervalSeconds int `json:"min_interval_seconds"`
-}
-
-// Helper functions for duration conversion
-func secondsToDuration(seconds int) time.Duration {
- return time.Duration(seconds) * time.Second
-}
-
-func durationToSeconds(d time.Duration) int {
- return int(d.Seconds())
-}
-
-// formatDurationForUser formats seconds as a user-friendly duration string
-func formatDurationForUser(seconds int) string {
- d := secondsToDuration(seconds)
- if d < time.Minute {
- return fmt.Sprintf("%ds", seconds)
- }
- if d < time.Hour {
- return fmt.Sprintf("%.0fm", d.Minutes())
- }
- if d < 24*time.Hour {
- return fmt.Sprintf("%.1fh", d.Hours())
- }
- return fmt.Sprintf("%.1fd", d.Hours()/24)
-}
-
-// RenderConfigForm renders the configuration form HTML
-func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
- config := ui.getCurrentBalanceConfig()
-
- // Build form using the FormBuilder helper
- form := types.NewFormBuilder()
-
- // Detection Settings
- form.AddCheckboxField(
- "enabled",
- "Enable Balance Tasks",
- "Whether balance tasks should be automatically created",
- config.Enabled,
- )
-
- form.AddNumberField(
- "imbalance_threshold",
- "Imbalance Threshold (%)",
- "Trigger balance when storage imbalance exceeds this percentage (0.0-1.0)",
- config.ImbalanceThreshold,
- true,
- )
-
- form.AddDurationField("scan_interval", "Scan Interval", "How often to scan for imbalanced volumes", secondsToDuration(config.ScanIntervalSeconds), true)
-
- // Scheduling Settings
- form.AddNumberField(
- "max_concurrent",
- "Max Concurrent Tasks",
- "Maximum number of balance tasks that can run simultaneously",
- float64(config.MaxConcurrent),
- true,
- )
-
- form.AddNumberField(
- "min_server_count",
- "Minimum Server Count",
- "Only balance when at least this many servers are available",
- float64(config.MinServerCount),
- true,
- )
-
- // Timing Settings
- form.AddCheckboxField(
- "move_during_off_hours",
- "Restrict to Off-Hours",
- "Only perform balance operations during off-peak hours",
- config.MoveDuringOffHours,
- )
-
- form.AddTextField(
- "off_hours_start",
- "Off-Hours Start Time",
- "Start time for off-hours window (e.g., 23:00)",
- config.OffHoursStart,
- false,
- )
-
- form.AddTextField(
- "off_hours_end",
- "Off-Hours End Time",
- "End time for off-hours window (e.g., 06:00)",
- config.OffHoursEnd,
- false,
- )
-
- // Timing constraints
- form.AddDurationField("min_interval", "Min Interval", "Minimum time between balance operations", secondsToDuration(config.MinIntervalSeconds), true)
-
- // Generate organized form sections using Bootstrap components
- html := `
-<div class="row">
- <div class="col-12">
- <div class="card mb-4">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-balance-scale me-2"></i>
- Balance Configuration
- </h5>
- </div>
- <div class="card-body">
-` + string(form.Build()) + `
- </div>
- </div>
- </div>
-</div>
-
-<div class="row">
- <div class="col-12">
- <div class="card mb-3">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-exclamation-triangle me-2"></i>
- Performance Considerations
- </h5>
- </div>
- <div class="card-body">
- <div class="alert alert-warning" role="alert">
- <h6 class="alert-heading">Important Considerations:</h6>
- <p class="mb-2"><strong>Performance:</strong> Volume balancing involves data movement and can impact cluster performance.</p>
- <p class="mb-2"><strong>Recommendation:</strong> Enable off-hours restriction to minimize impact on production workloads.</p>
- <p class="mb-0"><strong>Safety:</strong> Requires at least ` + fmt.Sprintf("%d", config.MinServerCount) + ` servers to ensure data safety during moves.</p>
- </div>
- </div>
- </div>
- </div>
-</div>`
-
- return template.HTML(html), nil
-}
-
-// ParseConfigForm parses form data into configuration
-func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
- config := &BalanceConfig{}
-
- // Parse enabled
- config.Enabled = len(formData["enabled"]) > 0
-
- // Parse imbalance threshold
- if values, ok := formData["imbalance_threshold"]; ok && len(values) > 0 {
- threshold, err := strconv.ParseFloat(values[0], 64)
- if err != nil {
- return nil, fmt.Errorf("invalid imbalance threshold: %w", err)
- }
- if threshold < 0 || threshold > 1 {
- return nil, fmt.Errorf("imbalance threshold must be between 0.0 and 1.0")
- }
- config.ImbalanceThreshold = threshold
- }
-
- // Parse scan interval
- if values, ok := formData["scan_interval"]; ok && len(values) > 0 {
- duration, err := time.ParseDuration(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid scan interval: %w", err)
- }
- config.ScanIntervalSeconds = int(duration.Seconds())
- }
-
- // Parse max concurrent
- if values, ok := formData["max_concurrent"]; ok && len(values) > 0 {
- maxConcurrent, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid max concurrent: %w", err)
- }
- if maxConcurrent < 1 {
- return nil, fmt.Errorf("max concurrent must be at least 1")
- }
- config.MaxConcurrent = maxConcurrent
- }
-
- // Parse min server count
- if values, ok := formData["min_server_count"]; ok && len(values) > 0 {
- minServerCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid min server count: %w", err)
- }
- if minServerCount < 2 {
- return nil, fmt.Errorf("min server count must be at least 2")
- }
- config.MinServerCount = minServerCount
- }
-
- // Parse off-hours settings
- config.MoveDuringOffHours = len(formData["move_during_off_hours"]) > 0
-
- if values, ok := formData["off_hours_start"]; ok && len(values) > 0 {
- config.OffHoursStart = values[0]
- }
-
- if values, ok := formData["off_hours_end"]; ok && len(values) > 0 {
- config.OffHoursEnd = values[0]
- }
-
- // Parse min interval
- if values, ok := formData["min_interval"]; ok && len(values) > 0 {
- duration, err := time.ParseDuration(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid min interval: %w", err)
- }
- config.MinIntervalSeconds = int(duration.Seconds())
- }
-
- return config, nil
-}
-
-// GetCurrentConfig returns the current configuration
-func (ui *UIProvider) GetCurrentConfig() interface{} {
- return ui.getCurrentBalanceConfig()
-}
-
-// ApplyConfig applies the new configuration
-func (ui *UIProvider) ApplyConfig(config interface{}) error {
- balanceConfig, ok := config.(*BalanceConfig)
- if !ok {
- return fmt.Errorf("invalid config type, expected *BalanceConfig")
- }
-
- // Apply to detector
- if ui.detector != nil {
- ui.detector.SetEnabled(balanceConfig.Enabled)
- ui.detector.SetThreshold(balanceConfig.ImbalanceThreshold)
- ui.detector.SetMinCheckInterval(secondsToDuration(balanceConfig.ScanIntervalSeconds))
- }
-
- // Apply to scheduler
- if ui.scheduler != nil {
- ui.scheduler.SetEnabled(balanceConfig.Enabled)
- ui.scheduler.SetMaxConcurrent(balanceConfig.MaxConcurrent)
- ui.scheduler.SetMinServerCount(balanceConfig.MinServerCount)
- ui.scheduler.SetMoveDuringOffHours(balanceConfig.MoveDuringOffHours)
- ui.scheduler.SetOffHoursStart(balanceConfig.OffHoursStart)
- ui.scheduler.SetOffHoursEnd(balanceConfig.OffHoursEnd)
- }
-
- glog.V(1).Infof("Applied balance configuration: enabled=%v, threshold=%.1f%%, max_concurrent=%d, min_servers=%d, off_hours=%v",
- balanceConfig.Enabled, balanceConfig.ImbalanceThreshold*100, balanceConfig.MaxConcurrent,
- balanceConfig.MinServerCount, balanceConfig.MoveDuringOffHours)
-
- return nil
-}
-
-// getCurrentBalanceConfig gets the current configuration from detector and scheduler
-func (ui *UIProvider) getCurrentBalanceConfig() *BalanceConfig {
- config := &BalanceConfig{
- // Default values (fallback if detectors/schedulers are nil)
- Enabled: true,
- ImbalanceThreshold: 0.1, // 10% imbalance
- ScanIntervalSeconds: durationToSeconds(4 * time.Hour),
- MaxConcurrent: 1,
- MinServerCount: 3,
- MoveDuringOffHours: true,
- OffHoursStart: "23:00",
- OffHoursEnd: "06:00",
- MinIntervalSeconds: durationToSeconds(1 * time.Hour),
- }
-
- // Get current values from detector
- if ui.detector != nil {
- config.Enabled = ui.detector.IsEnabled()
- config.ImbalanceThreshold = ui.detector.GetThreshold()
- config.ScanIntervalSeconds = int(ui.detector.ScanInterval().Seconds())
- }
-
- // Get current values from scheduler
- if ui.scheduler != nil {
- config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
- config.MinServerCount = ui.scheduler.GetMinServerCount()
- config.MoveDuringOffHours = ui.scheduler.GetMoveDuringOffHours()
- config.OffHoursStart = ui.scheduler.GetOffHoursStart()
- config.OffHoursEnd = ui.scheduler.GetOffHoursEnd()
- }
-
- return config
-}
-
-// RegisterUI registers the balance UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *BalanceDetector, scheduler *BalanceScheduler) {
- uiProvider := NewUIProvider(detector, scheduler)
- uiRegistry.RegisterUI(uiProvider)
-
- glog.V(1).Infof("βœ… Registered balance task UI provider")
-}
-
-// DefaultBalanceConfig returns default balance configuration
-func DefaultBalanceConfig() *BalanceConfig {
- return &BalanceConfig{
- Enabled: false,
- ImbalanceThreshold: 0.3,
- ScanIntervalSeconds: durationToSeconds(4 * time.Hour),
- MaxConcurrent: 1,
- MinServerCount: 3,
- MoveDuringOffHours: false,
- OffHoursStart: "22:00",
- OffHoursEnd: "06:00",
- MinIntervalSeconds: durationToSeconds(1 * time.Hour),
- }
-}
diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go
new file mode 100644
index 000000000..27ad1bb29
--- /dev/null
+++ b/weed/worker/tasks/base/generic_components.go
@@ -0,0 +1,129 @@
+package base
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericDetector implements TaskDetector using function-based logic
+type GenericDetector struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericDetector creates a detector from a task definition
+func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
+ return &GenericDetector{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (d *GenericDetector) GetTaskType() types.TaskType {
+ return d.taskDef.Type
+}
+
+// ScanForTasks scans using the task definition's detection function
+func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
+ if d.taskDef.DetectionFunc == nil {
+ return nil, nil
+ }
+ return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
+}
+
+// ScanInterval returns the scan interval from task definition
+func (d *GenericDetector) ScanInterval() time.Duration {
+ if d.taskDef.ScanInterval > 0 {
+ return d.taskDef.ScanInterval
+ }
+ return 30 * time.Minute // Default
+}
+
+// IsEnabled returns whether this detector is enabled
+func (d *GenericDetector) IsEnabled() bool {
+ return d.taskDef.Config.IsEnabled()
+}
+
+// GenericScheduler implements TaskScheduler using function-based logic
+type GenericScheduler struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericScheduler creates a scheduler from a task definition
+func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
+ return &GenericScheduler{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (s *GenericScheduler) GetTaskType() types.TaskType {
+ return s.taskDef.Type
+}
+
+// CanScheduleNow determines if a task can be scheduled using the task definition's function
+func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if s.taskDef.SchedulingFunc == nil {
+ return s.defaultCanSchedule(task, runningTasks, availableWorkers)
+ }
+ return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
+}
+
+// defaultCanSchedule provides default scheduling logic
+func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if !s.taskDef.Config.IsEnabled() {
+ return false
+ }
+
+ // Count running tasks of this type
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == s.taskDef.Type {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ maxConcurrent := s.taskDef.MaxConcurrent
+ if maxConcurrent <= 0 {
+ maxConcurrent = 1 // Default
+ }
+ if runningCount >= maxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == s.taskDef.Type {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// GetPriority returns the priority for this task
+func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority {
+ return task.Priority
+}
+
+// GetMaxConcurrent returns max concurrent tasks
+func (s *GenericScheduler) GetMaxConcurrent() int {
+ if s.taskDef.MaxConcurrent > 0 {
+ return s.taskDef.MaxConcurrent
+ }
+ return 1 // Default
+}
+
+// GetDefaultRepeatInterval returns the default repeat interval
+func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
+ if s.taskDef.RepeatInterval > 0 {
+ return s.taskDef.RepeatInterval
+ }
+ return 24 * time.Hour // Default
+}
+
+// IsEnabled returns whether this scheduler is enabled
+func (s *GenericScheduler) IsEnabled() bool {
+ return s.taskDef.Config.IsEnabled()
+}
diff --git a/weed/worker/tasks/base/registration.go b/weed/worker/tasks/base/registration.go
new file mode 100644
index 000000000..416b6f6b8
--- /dev/null
+++ b/weed/worker/tasks/base/registration.go
@@ -0,0 +1,155 @@
+package base
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericFactory creates task instances using a TaskDefinition
+type GenericFactory struct {
+ *tasks.BaseTaskFactory
+ taskDef *TaskDefinition
+}
+
+// NewGenericFactory creates a generic task factory
+func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory {
+ return &GenericFactory{
+ BaseTaskFactory: tasks.NewBaseTaskFactory(
+ taskDef.Type,
+ taskDef.Capabilities,
+ taskDef.Description,
+ ),
+ taskDef: taskDef,
+ }
+}
+
+// Create creates a task instance using the task definition
+func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) {
+ if f.taskDef.CreateTask == nil {
+ return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type)
+ }
+ return f.taskDef.CreateTask(params)
+}
+
+// GenericSchemaProvider provides config schema from TaskDefinition
+type GenericSchemaProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetConfigSchema returns the schema from task definition
+func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
+ return &tasks.TaskConfigSchema{
+ TaskName: string(p.taskDef.Type),
+ DisplayName: p.taskDef.DisplayName,
+ Description: p.taskDef.Description,
+ Icon: p.taskDef.Icon,
+ Schema: config.Schema{
+ Fields: p.taskDef.ConfigSpec.Fields,
+ },
+ }
+}
+
+// GenericUIProvider provides UI functionality from TaskDefinition
+type GenericUIProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetTaskType returns the task type
+func (ui *GenericUIProvider) GetTaskType() types.TaskType {
+ return ui.taskDef.Type
+}
+
+// GetDisplayName returns the human-readable name
+func (ui *GenericUIProvider) GetDisplayName() string {
+ return ui.taskDef.DisplayName
+}
+
+// GetDescription returns a description of what this task does
+func (ui *GenericUIProvider) GetDescription() string {
+ return ui.taskDef.Description
+}
+
+// GetIcon returns the icon CSS class for this task type
+func (ui *GenericUIProvider) GetIcon() string {
+ return ui.taskDef.Icon
+}
+
+// GetCurrentConfig returns current config as TaskConfig
+func (ui *GenericUIProvider) GetCurrentConfig() types.TaskConfig {
+ return ui.taskDef.Config
+}
+
+// ApplyTaskPolicy applies protobuf TaskPolicy configuration
+func (ui *GenericUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return ui.taskDef.Config.FromTaskPolicy(policy)
+}
+
+// ApplyTaskConfig applies TaskConfig interface configuration
+func (ui *GenericUIProvider) ApplyTaskConfig(config types.TaskConfig) error {
+ taskPolicy := config.ToTaskPolicy()
+ return ui.taskDef.Config.FromTaskPolicy(taskPolicy)
+}
+
+// RegisterTask registers a complete task definition with all registries
+func RegisterTask(taskDef *TaskDefinition) {
+ // Validate task definition
+ if err := validateTaskDefinition(taskDef); err != nil {
+ glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err)
+ return
+ }
+
+ // Create and register factory
+ factory := NewGenericFactory(taskDef)
+ tasks.AutoRegister(taskDef.Type, factory)
+
+ // Create and register detector/scheduler
+ detector := NewGenericDetector(taskDef)
+ scheduler := NewGenericScheduler(taskDef)
+
+ tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
+ registry.RegisterTask(detector, scheduler)
+ })
+
+ // Create and register schema provider
+ schemaProvider := &GenericSchemaProvider{taskDef: taskDef}
+ tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider)
+
+ // Create and register UI provider
+ uiProvider := &GenericUIProvider{taskDef: taskDef}
+ tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
+ baseUIProvider := tasks.NewBaseUIProvider(
+ taskDef.Type,
+ taskDef.DisplayName,
+ taskDef.Description,
+ taskDef.Icon,
+ schemaProvider.GetConfigSchema,
+ uiProvider.GetCurrentConfig,
+ uiProvider.ApplyTaskPolicy,
+ uiProvider.ApplyTaskConfig,
+ )
+ uiRegistry.RegisterUI(baseUIProvider)
+ })
+
+ glog.V(1).Infof("βœ… Registered complete task definition: %s", taskDef.Type)
+}
+
+// validateTaskDefinition ensures the task definition is complete
+func validateTaskDefinition(taskDef *TaskDefinition) error {
+ if taskDef.Type == "" {
+ return fmt.Errorf("task type is required")
+ }
+ if taskDef.Name == "" {
+ return fmt.Errorf("task name is required")
+ }
+ if taskDef.Config == nil {
+ return fmt.Errorf("task config is required")
+ }
+ // CreateTask is optional for tasks that use the typed task system
+ // The typed system registers tasks separately via types.RegisterGlobalTypedTask()
+ return nil
+}
diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go
new file mode 100644
index 000000000..6689d9c81
--- /dev/null
+++ b/weed/worker/tasks/base/task_definition.go
@@ -0,0 +1,272 @@
+package base
+
+import (
+ "fmt"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TaskDefinition encapsulates everything needed to define a complete task type
+type TaskDefinition struct {
+ // Basic task information
+ Type types.TaskType
+ Name string
+ DisplayName string
+ Description string
+ Icon string
+ Capabilities []string
+
+ // Task configuration
+ Config TaskConfig
+ ConfigSpec ConfigSpec
+
+ // Task creation
+ CreateTask func(params types.TaskParams) (types.TaskInterface, error)
+
+ // Detection logic
+ DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error)
+ ScanInterval time.Duration
+
+ // Scheduling logic
+ SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool
+ MaxConcurrent int
+ RepeatInterval time.Duration
+}
+
+// TaskConfig provides a configuration interface that supports type-safe defaults
+type TaskConfig interface {
+ config.ConfigWithDefaults // Extends ConfigWithDefaults for type-safe schema operations
+ IsEnabled() bool
+ SetEnabled(bool)
+ ToTaskPolicy() *worker_pb.TaskPolicy
+ FromTaskPolicy(policy *worker_pb.TaskPolicy) error
+}
+
+// ConfigSpec defines the configuration schema
+type ConfigSpec struct {
+ Fields []*config.Field
+}
+
+// BaseConfig provides common configuration fields with reflection-based serialization
+type BaseConfig struct {
+ Enabled bool `json:"enabled"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+}
+
+// IsEnabled returns whether the task is enabled
+func (c *BaseConfig) IsEnabled() bool {
+ return c.Enabled
+}
+
+// SetEnabled sets whether the task is enabled
+func (c *BaseConfig) SetEnabled(enabled bool) {
+ c.Enabled = enabled
+}
+
+// Validate validates the base configuration
+func (c *BaseConfig) Validate() error {
+ // Common validation logic
+ return nil
+}
+
+// StructToMap converts any struct to a map using reflection
+func StructToMap(obj interface{}) map[string]interface{} {
+ result := make(map[string]interface{})
+ val := reflect.ValueOf(obj)
+
+ // Handle pointer to struct
+ if val.Kind() == reflect.Ptr {
+ val = val.Elem()
+ }
+
+ if val.Kind() != reflect.Struct {
+ return result
+ }
+
+ typ := val.Type()
+
+ for i := 0; i < val.NumField(); i++ {
+ field := val.Field(i)
+ fieldType := typ.Field(i)
+
+ // Skip unexported fields
+ if !field.CanInterface() {
+ continue
+ }
+
+ // Handle embedded structs recursively (before JSON tag check)
+ if field.Kind() == reflect.Struct && fieldType.Anonymous {
+ embeddedMap := StructToMap(field.Interface())
+ for k, v := range embeddedMap {
+ result[k] = v
+ }
+ continue
+ }
+
+ // Get JSON tag name
+ jsonTag := fieldType.Tag.Get("json")
+ if jsonTag == "" || jsonTag == "-" {
+ continue
+ }
+
+ // Remove options like ",omitempty"
+ if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
+ jsonTag = jsonTag[:commaIdx]
+ }
+
+ result[jsonTag] = field.Interface()
+ }
+ return result
+}
+
+// MapToStruct loads data from map into struct using reflection
+func MapToStruct(data map[string]interface{}, obj interface{}) error {
+ val := reflect.ValueOf(obj)
+
+ // Must be pointer to struct
+ if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct {
+ return fmt.Errorf("obj must be pointer to struct")
+ }
+
+ val = val.Elem()
+ typ := val.Type()
+
+ for i := 0; i < val.NumField(); i++ {
+ field := val.Field(i)
+ fieldType := typ.Field(i)
+
+ // Skip unexported fields
+ if !field.CanSet() {
+ continue
+ }
+
+ // Handle embedded structs recursively (before JSON tag check)
+ if field.Kind() == reflect.Struct && fieldType.Anonymous {
+ err := MapToStruct(data, field.Addr().Interface())
+ if err != nil {
+ return err
+ }
+ continue
+ }
+
+ // Get JSON tag name
+ jsonTag := fieldType.Tag.Get("json")
+ if jsonTag == "" || jsonTag == "-" {
+ continue
+ }
+
+ // Remove options like ",omitempty"
+ if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
+ jsonTag = jsonTag[:commaIdx]
+ }
+
+ if value, exists := data[jsonTag]; exists {
+ err := setFieldValue(field, value)
+ if err != nil {
+ return fmt.Errorf("failed to set field %s: %v", jsonTag, err)
+ }
+ }
+ }
+
+ return nil
+}
+
+// ToMap converts config to map using reflection
+// ToTaskPolicy converts BaseConfig to protobuf (partial implementation)
+// Note: Concrete implementations should override this to include task-specific config
+func (c *BaseConfig) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ // TaskConfig field should be set by concrete implementations
+ }
+}
+
+// FromTaskPolicy loads BaseConfig from protobuf (partial implementation)
+// Note: Concrete implementations should override this to handle task-specific config
+func (c *BaseConfig) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
+ return nil
+}
+
+// ApplySchemaDefaults applies default values from schema using reflection
+func (c *BaseConfig) ApplySchemaDefaults(schema *config.Schema) error {
+ // Use reflection-based approach for BaseConfig since it needs to handle embedded structs
+ return schema.ApplyDefaultsToProtobuf(c)
+}
+
+// setFieldValue sets a field value with type conversion
+func setFieldValue(field reflect.Value, value interface{}) error {
+ if value == nil {
+ return nil
+ }
+
+ valueVal := reflect.ValueOf(value)
+ fieldType := field.Type()
+ valueType := valueVal.Type()
+
+ // Direct assignment if types match
+ if valueType.AssignableTo(fieldType) {
+ field.Set(valueVal)
+ return nil
+ }
+
+ // Type conversion for common cases
+ switch fieldType.Kind() {
+ case reflect.Bool:
+ if b, ok := value.(bool); ok {
+ field.SetBool(b)
+ } else {
+ return fmt.Errorf("cannot convert %T to bool", value)
+ }
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+ switch v := value.(type) {
+ case int:
+ field.SetInt(int64(v))
+ case int32:
+ field.SetInt(int64(v))
+ case int64:
+ field.SetInt(v)
+ case float64:
+ field.SetInt(int64(v))
+ default:
+ return fmt.Errorf("cannot convert %T to int", value)
+ }
+ case reflect.Float32, reflect.Float64:
+ switch v := value.(type) {
+ case float32:
+ field.SetFloat(float64(v))
+ case float64:
+ field.SetFloat(v)
+ case int:
+ field.SetFloat(float64(v))
+ case int64:
+ field.SetFloat(float64(v))
+ default:
+ return fmt.Errorf("cannot convert %T to float", value)
+ }
+ case reflect.String:
+ if s, ok := value.(string); ok {
+ field.SetString(s)
+ } else {
+ return fmt.Errorf("cannot convert %T to string", value)
+ }
+ default:
+ return fmt.Errorf("unsupported field type %s", fieldType.Kind())
+ }
+
+ return nil
+}
diff --git a/weed/worker/tasks/base/task_definition_test.go b/weed/worker/tasks/base/task_definition_test.go
new file mode 100644
index 000000000..a0a0a5a24
--- /dev/null
+++ b/weed/worker/tasks/base/task_definition_test.go
@@ -0,0 +1,338 @@
+package base
+
+import (
+ "reflect"
+ "testing"
+)
+
+// Test structs that mirror the actual configuration structure
+type TestBaseConfig struct {
+ Enabled bool `json:"enabled"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+}
+
+type TestTaskConfig struct {
+ TestBaseConfig
+ TaskSpecificField float64 `json:"task_specific_field"`
+ AnotherSpecificField string `json:"another_specific_field"`
+}
+
+type TestNestedConfig struct {
+ TestBaseConfig
+ NestedStruct struct {
+ NestedField string `json:"nested_field"`
+ } `json:"nested_struct"`
+ TaskField int `json:"task_field"`
+}
+
+func TestStructToMap_WithEmbeddedStruct(t *testing.T) {
+ // Test case 1: Basic embedded struct
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "test_value",
+ }
+
+ result := StructToMap(config)
+
+ // Verify all fields are present
+ expectedFields := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 1800,
+ "max_concurrent": 3,
+ "task_specific_field": 0.25,
+ "another_specific_field": "test_value",
+ }
+
+ if len(result) != len(expectedFields) {
+ t.Errorf("Expected %d fields, got %d. Result: %+v", len(expectedFields), len(result), result)
+ }
+
+ for key, expectedValue := range expectedFields {
+ if actualValue, exists := result[key]; !exists {
+ t.Errorf("Missing field: %s", key)
+ } else if !reflect.DeepEqual(actualValue, expectedValue) {
+ t.Errorf("Field %s: expected %v (%T), got %v (%T)", key, expectedValue, expectedValue, actualValue, actualValue)
+ }
+ }
+}
+
+func TestStructToMap_WithNestedStruct(t *testing.T) {
+ config := &TestNestedConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: false,
+ ScanIntervalSeconds: 3600,
+ MaxConcurrent: 1,
+ },
+ NestedStruct: struct {
+ NestedField string `json:"nested_field"`
+ }{
+ NestedField: "nested_value",
+ },
+ TaskField: 42,
+ }
+
+ result := StructToMap(config)
+
+ // Verify embedded struct fields are included
+ if enabled, exists := result["enabled"]; !exists || enabled != false {
+ t.Errorf("Expected enabled=false from embedded struct, got %v", enabled)
+ }
+
+ if scanInterval, exists := result["scan_interval_seconds"]; !exists || scanInterval != 3600 {
+ t.Errorf("Expected scan_interval_seconds=3600 from embedded struct, got %v", scanInterval)
+ }
+
+ if maxConcurrent, exists := result["max_concurrent"]; !exists || maxConcurrent != 1 {
+ t.Errorf("Expected max_concurrent=1 from embedded struct, got %v", maxConcurrent)
+ }
+
+ // Verify regular fields are included
+ if taskField, exists := result["task_field"]; !exists || taskField != 42 {
+ t.Errorf("Expected task_field=42, got %v", taskField)
+ }
+
+ // Verify nested struct is included as a whole
+ if nestedStruct, exists := result["nested_struct"]; !exists {
+ t.Errorf("Missing nested_struct field")
+ } else {
+ // The nested struct should be included as-is, not flattened
+ if nested, ok := nestedStruct.(struct {
+ NestedField string `json:"nested_field"`
+ }); !ok || nested.NestedField != "nested_value" {
+ t.Errorf("Expected nested_struct with NestedField='nested_value', got %v", nestedStruct)
+ }
+ }
+}
+
+func TestMapToStruct_WithEmbeddedStruct(t *testing.T) {
+ // Test data with all fields including embedded struct fields
+ data := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 2400,
+ "max_concurrent": 5,
+ "task_specific_field": 0.15,
+ "another_specific_field": "updated_value",
+ }
+
+ config := &TestTaskConfig{}
+ err := MapToStruct(data, config)
+
+ if err != nil {
+ t.Fatalf("MapToStruct failed: %v", err)
+ }
+
+ // Verify embedded struct fields were set
+ if config.Enabled != true {
+ t.Errorf("Expected Enabled=true, got %v", config.Enabled)
+ }
+
+ if config.ScanIntervalSeconds != 2400 {
+ t.Errorf("Expected ScanIntervalSeconds=2400, got %v", config.ScanIntervalSeconds)
+ }
+
+ if config.MaxConcurrent != 5 {
+ t.Errorf("Expected MaxConcurrent=5, got %v", config.MaxConcurrent)
+ }
+
+ // Verify regular fields were set
+ if config.TaskSpecificField != 0.15 {
+ t.Errorf("Expected TaskSpecificField=0.15, got %v", config.TaskSpecificField)
+ }
+
+ if config.AnotherSpecificField != "updated_value" {
+ t.Errorf("Expected AnotherSpecificField='updated_value', got %v", config.AnotherSpecificField)
+ }
+}
+
+func TestMapToStruct_PartialData(t *testing.T) {
+ // Test with only some fields present (simulating form data)
+ data := map[string]interface{}{
+ "enabled": false,
+ "max_concurrent": 2,
+ "task_specific_field": 0.30,
+ }
+
+ // Start with some initial values
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 1,
+ },
+ TaskSpecificField: 0.20,
+ AnotherSpecificField: "initial_value",
+ }
+
+ err := MapToStruct(data, config)
+
+ if err != nil {
+ t.Fatalf("MapToStruct failed: %v", err)
+ }
+
+ // Verify updated fields
+ if config.Enabled != false {
+ t.Errorf("Expected Enabled=false (updated), got %v", config.Enabled)
+ }
+
+ if config.MaxConcurrent != 2 {
+ t.Errorf("Expected MaxConcurrent=2 (updated), got %v", config.MaxConcurrent)
+ }
+
+ if config.TaskSpecificField != 0.30 {
+ t.Errorf("Expected TaskSpecificField=0.30 (updated), got %v", config.TaskSpecificField)
+ }
+
+ // Verify unchanged fields remain the same
+ if config.ScanIntervalSeconds != 1800 {
+ t.Errorf("Expected ScanIntervalSeconds=1800 (unchanged), got %v", config.ScanIntervalSeconds)
+ }
+
+ if config.AnotherSpecificField != "initial_value" {
+ t.Errorf("Expected AnotherSpecificField='initial_value' (unchanged), got %v", config.AnotherSpecificField)
+ }
+}
+
+func TestRoundTripSerialization(t *testing.T) {
+ // Test complete round-trip: struct -> map -> struct
+ original := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 3600,
+ MaxConcurrent: 4,
+ },
+ TaskSpecificField: 0.18,
+ AnotherSpecificField: "round_trip_test",
+ }
+
+ // Convert to map
+ dataMap := StructToMap(original)
+
+ // Convert back to struct
+ roundTrip := &TestTaskConfig{}
+ err := MapToStruct(dataMap, roundTrip)
+
+ if err != nil {
+ t.Fatalf("Round-trip MapToStruct failed: %v", err)
+ }
+
+ // Verify all fields match
+ if !reflect.DeepEqual(original.TestBaseConfig, roundTrip.TestBaseConfig) {
+ t.Errorf("BaseConfig mismatch:\nOriginal: %+v\nRound-trip: %+v", original.TestBaseConfig, roundTrip.TestBaseConfig)
+ }
+
+ if original.TaskSpecificField != roundTrip.TaskSpecificField {
+ t.Errorf("TaskSpecificField mismatch: %v != %v", original.TaskSpecificField, roundTrip.TaskSpecificField)
+ }
+
+ if original.AnotherSpecificField != roundTrip.AnotherSpecificField {
+ t.Errorf("AnotherSpecificField mismatch: %v != %v", original.AnotherSpecificField, roundTrip.AnotherSpecificField)
+ }
+}
+
+func TestStructToMap_EmptyStruct(t *testing.T) {
+ config := &TestTaskConfig{}
+ result := StructToMap(config)
+
+ // Should still include all fields, even with zero values
+ expectedFields := []string{"enabled", "scan_interval_seconds", "max_concurrent", "task_specific_field", "another_specific_field"}
+
+ for _, field := range expectedFields {
+ if _, exists := result[field]; !exists {
+ t.Errorf("Missing field: %s", field)
+ }
+ }
+}
+
+func TestStructToMap_NilPointer(t *testing.T) {
+ var config *TestTaskConfig = nil
+ result := StructToMap(config)
+
+ if len(result) != 0 {
+ t.Errorf("Expected empty map for nil pointer, got %+v", result)
+ }
+}
+
+func TestMapToStruct_InvalidInput(t *testing.T) {
+ data := map[string]interface{}{
+ "enabled": "not_a_bool", // Wrong type
+ }
+
+ config := &TestTaskConfig{}
+ err := MapToStruct(data, config)
+
+ if err == nil {
+ t.Errorf("Expected error for invalid input type, but got none")
+ }
+}
+
+func TestMapToStruct_NonPointer(t *testing.T) {
+ data := map[string]interface{}{
+ "enabled": true,
+ }
+
+ config := TestTaskConfig{} // Not a pointer
+ err := MapToStruct(data, config)
+
+ if err == nil {
+ t.Errorf("Expected error for non-pointer input, but got none")
+ }
+}
+
+// Benchmark tests to ensure performance is reasonable
+func BenchmarkStructToMap(b *testing.B) {
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _ = StructToMap(config)
+ }
+}
+
+func BenchmarkMapToStruct(b *testing.B) {
+ data := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 1800,
+ "max_concurrent": 3,
+ "task_specific_field": 0.25,
+ "another_specific_field": "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ config := &TestTaskConfig{}
+ _ = MapToStruct(data, config)
+ }
+}
+
+func BenchmarkRoundTrip(b *testing.B) {
+ original := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ dataMap := StructToMap(original)
+ roundTrip := &TestTaskConfig{}
+ _ = MapToStruct(dataMap, roundTrip)
+ }
+}
diff --git a/weed/worker/tasks/base/typed_task.go b/weed/worker/tasks/base/typed_task.go
new file mode 100644
index 000000000..9d2839607
--- /dev/null
+++ b/weed/worker/tasks/base/typed_task.go
@@ -0,0 +1,218 @@
+package base
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// BaseTypedTask provides a base implementation for typed tasks with logger support
+type BaseTypedTask struct {
+ taskType types.TaskType
+ taskID string
+ progress float64
+ progressCallback func(float64)
+ cancelled bool
+ mutex sync.RWMutex
+
+ // Logger functionality
+ logger tasks.TaskLogger
+ loggerConfig types.TaskLoggerConfig
+}
+
+// NewBaseTypedTask creates a new base typed task
+func NewBaseTypedTask(taskType types.TaskType) *BaseTypedTask {
+ return &BaseTypedTask{
+ taskType: taskType,
+ progress: 0.0,
+ loggerConfig: types.TaskLoggerConfig{
+ BaseLogDir: "/data/task_logs",
+ MaxTasks: 100,
+ MaxLogSizeMB: 10,
+ EnableConsole: true,
+ },
+ }
+}
+
+// GetType returns the task type
+func (bt *BaseTypedTask) GetType() types.TaskType {
+ return bt.taskType
+}
+
+// IsCancellable returns whether the task can be cancelled
+func (bt *BaseTypedTask) IsCancellable() bool {
+ return true // Most tasks can be cancelled
+}
+
+// Cancel cancels the task
+func (bt *BaseTypedTask) Cancel() error {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.cancelled = true
+ return nil
+}
+
+// IsCancelled returns whether the task has been cancelled
+func (bt *BaseTypedTask) IsCancelled() bool {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.cancelled
+}
+
+// GetProgress returns the current progress (0-100)
+func (bt *BaseTypedTask) GetProgress() float64 {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.progress
+}
+
+// SetProgress sets the current progress and calls the callback if set
+func (bt *BaseTypedTask) SetProgress(progress float64) {
+ bt.mutex.Lock()
+ callback := bt.progressCallback
+ bt.progress = progress
+ bt.mutex.Unlock()
+
+ if callback != nil {
+ callback(progress)
+ }
+}
+
+// SetProgressCallback sets the progress callback function
+func (bt *BaseTypedTask) SetProgressCallback(callback func(float64)) {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.progressCallback = callback
+}
+
+// SetLoggerConfig sets the logger configuration for this task
+func (bt *BaseTypedTask) SetLoggerConfig(config types.TaskLoggerConfig) {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.loggerConfig = config
+}
+
+// convertToTasksLoggerConfig converts types.TaskLoggerConfig to tasks.TaskLoggerConfig
+func convertToTasksLoggerConfig(config types.TaskLoggerConfig) tasks.TaskLoggerConfig {
+ return tasks.TaskLoggerConfig{
+ BaseLogDir: config.BaseLogDir,
+ MaxTasks: config.MaxTasks,
+ MaxLogSizeMB: config.MaxLogSizeMB,
+ EnableConsole: config.EnableConsole,
+ }
+}
+
+// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
+func (bt *BaseTypedTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+
+ bt.taskID = taskID
+
+ // Convert the logger config to the tasks package type
+ tasksLoggerConfig := convertToTasksLoggerConfig(bt.loggerConfig)
+
+ logger, err := tasks.NewTaskLogger(taskID, bt.taskType, workerID, params, tasksLoggerConfig)
+ if err != nil {
+ return fmt.Errorf("failed to initialize task logger: %w", err)
+ }
+
+ bt.logger = logger
+ if bt.logger != nil {
+ bt.logger.Info("BaseTypedTask initialized for task %s (type: %s)", taskID, bt.taskType)
+ }
+
+ return nil
+}
+
+// GetTaskLogger returns the task logger (LoggerProvider interface)
+func (bt *BaseTypedTask) GetTaskLogger() types.TaskLogger {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.logger
+}
+
+// LogInfo logs an info message
+func (bt *BaseTypedTask) LogInfo(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Info(message, args...)
+ }
+}
+
+// LogWarning logs a warning message
+func (bt *BaseTypedTask) LogWarning(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Warning(message, args...)
+ }
+}
+
+// LogError logs an error message
+func (bt *BaseTypedTask) LogError(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Error(message, args...)
+ }
+}
+
+// LogDebug logs a debug message
+func (bt *BaseTypedTask) LogDebug(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Debug(message, args...)
+ }
+}
+
+// LogWithFields logs a message with structured fields
+func (bt *BaseTypedTask) LogWithFields(level string, message string, fields map[string]interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.LogWithFields(level, message, fields)
+ }
+}
+
+// ValidateTyped provides basic validation for typed parameters
+func (bt *BaseTypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
+ if params == nil {
+ return errors.New("task parameters cannot be nil")
+ }
+ if params.VolumeId == 0 {
+ return errors.New("volume_id is required")
+ }
+ if params.Server == "" {
+ return errors.New("server is required")
+ }
+ return nil
+}
+
+// EstimateTimeTyped provides a default time estimation
+func (bt *BaseTypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ // Default estimation - concrete tasks should override this
+ return 5 * time.Minute
+}
+
+// ExecuteTyped is a placeholder that concrete tasks must implement
+func (bt *BaseTypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
+ panic("ExecuteTyped must be implemented by concrete task types")
+}
diff --git a/weed/worker/tasks/config_update_registry.go b/weed/worker/tasks/config_update_registry.go
new file mode 100644
index 000000000..649c8b384
--- /dev/null
+++ b/weed/worker/tasks/config_update_registry.go
@@ -0,0 +1,67 @@
+package tasks
+
+import (
+ "sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// ConfigUpdateFunc is a function type for updating task configurations
+type ConfigUpdateFunc func(configPersistence interface{}) error
+
+// ConfigUpdateRegistry manages config update functions for all task types
+type ConfigUpdateRegistry struct {
+ updaters map[types.TaskType]ConfigUpdateFunc
+ mutex sync.RWMutex
+}
+
+var (
+ globalConfigUpdateRegistry *ConfigUpdateRegistry
+ configUpdateRegistryOnce sync.Once
+)
+
+// GetGlobalConfigUpdateRegistry returns the global config update registry (singleton)
+func GetGlobalConfigUpdateRegistry() *ConfigUpdateRegistry {
+ configUpdateRegistryOnce.Do(func() {
+ globalConfigUpdateRegistry = &ConfigUpdateRegistry{
+ updaters: make(map[types.TaskType]ConfigUpdateFunc),
+ }
+ glog.V(1).Infof("Created global config update registry")
+ })
+ return globalConfigUpdateRegistry
+}
+
+// RegisterConfigUpdater registers a config update function for a task type
+func (r *ConfigUpdateRegistry) RegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ r.updaters[taskType] = updateFunc
+ glog.V(1).Infof("Registered config updater for task type: %s", taskType)
+}
+
+// UpdateAllConfigs updates configurations for all registered task types
+func (r *ConfigUpdateRegistry) UpdateAllConfigs(configPersistence interface{}) {
+ r.mutex.RLock()
+ updaters := make(map[types.TaskType]ConfigUpdateFunc)
+ for k, v := range r.updaters {
+ updaters[k] = v
+ }
+ r.mutex.RUnlock()
+
+ for taskType, updateFunc := range updaters {
+ if err := updateFunc(configPersistence); err != nil {
+ glog.Warningf("Failed to load %s configuration from persistence: %v", taskType, err)
+ } else {
+ glog.V(1).Infof("Loaded %s configuration from persistence", taskType)
+ }
+ }
+
+ glog.V(1).Infof("All task configurations loaded from persistence")
+}
+
+// AutoRegisterConfigUpdater is a convenience function for registering config updaters
+func AutoRegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) {
+ registry := GetGlobalConfigUpdateRegistry()
+ registry.RegisterConfigUpdater(taskType, updateFunc)
+}
diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go
new file mode 100644
index 000000000..1f70fb8db
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/config.go
@@ -0,0 +1,207 @@
+package erasure_coding
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with erasure coding specific settings
+type Config struct {
+ base.BaseConfig
+ QuietForSeconds int `json:"quiet_for_seconds"`
+ FullnessRatio float64 `json:"fullness_ratio"`
+ CollectionFilter string `json:"collection_filter"`
+ MinSizeMB int `json:"min_size_mb"`
+}
+
+// NewDefaultConfig creates a new default erasure coding configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 60 * 60, // 1 hour
+ MaxConcurrent: 1,
+ },
+ QuietForSeconds: 300, // 5 minutes
+ FullnessRatio: 0.8, // 80%
+ CollectionFilter: "",
+ MinSizeMB: 30, // 30MB (more reasonable than 100MB)
+ }
+}
+
+// GetConfigSpec returns the configuration schema for erasure coding tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Erasure Coding Tasks",
+ Description: "Whether erasure coding tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing erasure coding",
+ HelpText: "The system will check for volumes that need erasure coding at this interval",
+ Placeholder: "1",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 5,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of erasure coding tasks that can run simultaneously",
+ HelpText: "Limits the number of erasure coding operations running at the same time",
+ Placeholder: "1 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "quiet_for_seconds",
+ JSONName: "quiet_for_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 300,
+ MinValue: 60,
+ MaxValue: 3600,
+ Required: true,
+ DisplayName: "Quiet Period",
+ Description: "Minimum time volume must be quiet before erasure coding",
+ HelpText: "Volume must not be modified for this duration before erasure coding",
+ Placeholder: "5",
+ Unit: config.UnitMinutes,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "fullness_ratio",
+ JSONName: "fullness_ratio",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.8,
+ MinValue: 0.1,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Fullness Ratio",
+ Description: "Minimum fullness ratio to trigger erasure coding",
+ HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
+ Placeholder: "0.80 (80%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "collection_filter",
+ JSONName: "collection_filter",
+ Type: config.FieldTypeString,
+ DefaultValue: "",
+ Required: false,
+ DisplayName: "Collection Filter",
+ Description: "Only process volumes from specific collections",
+ HelpText: "Leave empty to process all collections, or specify collection name",
+ Placeholder: "my_collection",
+ InputType: "text",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_size_mb",
+ JSONName: "min_size_mb",
+ Type: config.FieldTypeInt,
+ DefaultValue: 30,
+ MinValue: 1,
+ MaxValue: 1000,
+ Required: true,
+ DisplayName: "Minimum Size (MB)",
+ Description: "Minimum volume size to consider for erasure coding",
+ HelpText: "Only volumes larger than this size will be considered for erasure coding",
+ Placeholder: "30",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
+func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: float64(c.FullnessRatio),
+ QuietForSeconds: int32(c.QuietForSeconds),
+ MinVolumeSizeMb: int32(c.MinSizeMB),
+ CollectionFilter: c.CollectionFilter,
+ },
+ },
+ }
+}
+
+// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
+func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+
+ // Set general TaskPolicy fields
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
+
+ // Set erasure coding-specific fields from the task config
+ if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
+ c.FullnessRatio = float64(ecConfig.FullnessRatio)
+ c.QuietForSeconds = int(ecConfig.QuietForSeconds)
+ c.MinSizeMB = int(ecConfig.MinVolumeSizeMb)
+ c.CollectionFilter = ecConfig.CollectionFilter
+ }
+
+ return nil
+}
+
+// LoadConfigFromPersistence loads configuration from the persistence layer if available
+func LoadConfigFromPersistence(configPersistence interface{}) *Config {
+ config := NewDefaultConfig()
+
+ // Try to load from persistence if available
+ if persistence, ok := configPersistence.(interface {
+ LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error)
+ }); ok {
+ if policy, err := persistence.LoadErasureCodingTaskPolicy(); err == nil && policy != nil {
+ if err := config.FromTaskPolicy(policy); err == nil {
+ glog.V(1).Infof("Loaded erasure coding configuration from persistence")
+ return config
+ }
+ }
+ }
+
+ glog.V(1).Infof("Using default erasure coding configuration")
+ return config
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
new file mode 100644
index 000000000..1a2558396
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -0,0 +1,140 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for erasure coding tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ ecConfig := config.(*Config)
+ var results []*types.TaskDetectionResult
+ now := time.Now()
+ quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
+ minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum
+
+ debugCount := 0
+ skippedAlreadyEC := 0
+ skippedTooSmall := 0
+ skippedCollectionFilter := 0
+ skippedQuietTime := 0
+ skippedFullness := 0
+
+ for _, metric := range metrics {
+ // Skip if already EC volume
+ if metric.IsECVolume {
+ skippedAlreadyEC++
+ continue
+ }
+
+ // Check minimum size requirement
+ if metric.Size < minSizeBytes {
+ skippedTooSmall++
+ continue
+ }
+
+ // Check collection filter if specified
+ if ecConfig.CollectionFilter != "" {
+ // Parse comma-separated collections
+ allowedCollections := make(map[string]bool)
+ for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
+ allowedCollections[strings.TrimSpace(collection)] = true
+ }
+ // Skip if volume's collection is not in the allowed list
+ if !allowedCollections[metric.Collection] {
+ skippedCollectionFilter++
+ continue
+ }
+ }
+
+ // Check quiet duration and fullness criteria
+ if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: types.TaskPriorityLow, // EC is not urgent
+ Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
+ metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
+ float64(metric.Size)/(1024*1024)),
+ ScheduleAt: now,
+ }
+ results = append(results, result)
+ } else {
+ // Count debug reasons
+ if debugCount < 5 { // Limit to avoid spam
+ if metric.Age < quietThreshold {
+ skippedQuietTime++
+ }
+ if metric.FullnessRatio < ecConfig.FullnessRatio {
+ skippedFullness++
+ }
+ }
+ debugCount++
+ }
+ }
+
+ // Log debug summary if no tasks were created
+ if len(results) == 0 && len(metrics) > 0 {
+ totalVolumes := len(metrics)
+ glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
+ totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
+
+ // Show details for first few volumes
+ for i, metric := range metrics {
+ if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes
+ continue
+ }
+ sizeMB := float64(metric.Size) / (1024 * 1024)
+ glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need β‰₯%dMB), age=%s (need β‰₯%s), fullness=%.1f%% (need β‰₯%.1f%%)",
+ metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute),
+ metric.FullnessRatio*100, ecConfig.FullnessRatio*100)
+ }
+ }
+
+ return results, nil
+}
+
+// Scheduling implements the scheduling logic for erasure coding tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ ecConfig := config.(*Config)
+
+ // Check if we have available workers
+ if len(availableWorkers) == 0 {
+ return false
+ }
+
+ // Count running EC tasks
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeErasureCoding {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningCount >= ecConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if any worker can handle EC tasks
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeErasureCoding {
+ return true
+ }
+ }
+ }
+
+ return false
+}
diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go
index 641dfc6b5..8dc7a1cd0 100644
--- a/weed/worker/tasks/erasure_coding/ec.go
+++ b/weed/worker/tasks/erasure_coding/ec.go
@@ -1,79 +1,785 @@
package erasure_coding
import (
+ "context"
"fmt"
+ "io"
+ "math"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
)
-// Task implements erasure coding operation to convert volumes to EC format
+// Task implements comprehensive erasure coding with protobuf parameters
type Task struct {
- *tasks.BaseTask
- server string
- volumeID uint32
+ *base.BaseTypedTask
+
+ // Current task state
+ sourceServer string
+ volumeID uint32
+ collection string
+ workDir string
+ masterClient string
+ grpcDialOpt grpc.DialOption
+
+ // EC parameters from protobuf
+ destinations []*worker_pb.ECDestination // Disk-aware destinations
+ existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup
+ estimatedShardSize uint64
+ dataShards int
+ parityShards int
+ cleanupSource bool
+
+ // Progress tracking
+ currentStep string
+ stepProgress map[string]float64
}
-// NewTask creates a new erasure coding task instance
-func NewTask(server string, volumeID uint32) *Task {
+// NewTask creates a new erasure coding task
+func NewTask() types.TypedTaskInterface {
task := &Task{
- BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
- server: server,
- volumeID: volumeID,
+ BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding),
+ masterClient: "localhost:9333", // Default master client
+ workDir: "/tmp/seaweedfs_ec_work", // Default work directory
+ grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure
+ dataShards: erasure_coding.DataShardsCount, // Use package constant
+ parityShards: erasure_coding.ParityShardsCount, // Use package constant
+ stepProgress: make(map[string]float64),
}
return task
}
-// Execute executes the erasure coding task
-func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting erasure coding task for volume %d on server %s", t.volumeID, t.server)
+// ValidateTyped validates the typed parameters for EC task
+func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error {
+ // Basic validation from base class
+ if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
+ return err
+ }
+
+ // Check that we have EC-specific parameters
+ ecParams := params.GetErasureCodingParams()
+ if ecParams == nil {
+ return fmt.Errorf("erasure_coding_params is required for EC task")
+ }
+
+ // Require destinations
+ if len(ecParams.Destinations) == 0 {
+ return fmt.Errorf("destinations must be specified for EC task")
+ }
+
+ // DataShards and ParityShards are constants from erasure_coding package
+ expectedDataShards := int32(erasure_coding.DataShardsCount)
+ expectedParityShards := int32(erasure_coding.ParityShardsCount)
+
+ if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards {
+ return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards)
+ }
+ if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards {
+ return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards)
+ }
+
+ // Validate destination count
+ destinationCount := len(ecParams.Destinations)
+ totalShards := expectedDataShards + expectedParityShards
+ if totalShards > int32(destinationCount) {
+ return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount)
+ }
+
+ return nil
+}
+
+// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters
+func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations
+
+ ecParams := params.GetErasureCodingParams()
+ if ecParams != nil && ecParams.EstimatedShardSize > 0 {
+ // More accurate estimate based on shard size
+ // Account for copying, encoding, and distribution
+ gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024)
+ estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB
+ if estimatedTime > baseTime {
+ return estimatedTime
+ }
+ }
+
+ return baseTime
+}
+
+// ExecuteTyped implements the actual erasure coding workflow with typed parameters
+func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error {
+ // Extract basic parameters
+ t.volumeID = params.VolumeId
+ t.sourceServer = params.Server
+ t.collection = params.Collection
- // Simulate erasure coding operation with progress updates
- steps := []struct {
- name string
- duration time.Duration
- progress float64
- }{
- {"Analyzing volume", 2 * time.Second, 15},
- {"Creating EC shards", 5 * time.Second, 50},
- {"Verifying shards", 2 * time.Second, 75},
- {"Finalizing EC volume", 1 * time.Second, 100},
+ // Extract EC-specific parameters
+ ecParams := params.GetErasureCodingParams()
+ if ecParams != nil {
+ t.destinations = ecParams.Destinations // Store disk-aware destinations
+ t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup
+ t.estimatedShardSize = ecParams.EstimatedShardSize
+ t.cleanupSource = ecParams.CleanupSource
+
+ // DataShards and ParityShards are constants, don't override from parameters
+ // t.dataShards and t.parityShards are already set to constants in NewTask
+
+ if ecParams.WorkingDir != "" {
+ t.workDir = ecParams.WorkingDir
+ }
+ if ecParams.MasterClient != "" {
+ t.masterClient = ecParams.MasterClient
+ }
}
- for _, step := range steps {
- if t.IsCancelled() {
- return fmt.Errorf("erasure coding task cancelled")
+ // Determine available destinations for logging
+ var availableDestinations []string
+ for _, dest := range t.destinations {
+ availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId))
+ }
+
+ glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)",
+ t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards)
+
+ // Create unique working directory for this task
+ taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
+ if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
+ return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
+ }
+ glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir)
+
+ // Ensure cleanup of working directory
+ defer func() {
+ if err := os.RemoveAll(taskWorkDir); err != nil {
+ glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err)
+ } else {
+ glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir)
}
+ }()
+
+ // Step 1: Collect volume locations from master
+ glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master")
+ t.SetProgress(5.0)
+ volumeId := needle.VolumeId(t.volumeID)
+ volumeLocations, err := t.collectVolumeLocations(volumeId)
+ if err != nil {
+ return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations)
- glog.V(1).Infof("Erasure coding task step: %s", step.name)
- t.SetProgress(step.progress)
+ // Convert ServerAddress slice to string slice
+ var locationStrings []string
+ for _, addr := range volumeLocations {
+ locationStrings = append(locationStrings, string(addr))
+ }
- // Simulate work
- time.Sleep(step.duration)
+ // Step 2: Check if volume has sufficient size for EC encoding
+ if !t.shouldPerformECEncoding(locationStrings) {
+ glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID)
+ t.SetProgress(100.0)
+ return nil
}
- glog.Infof("Erasure coding task completed for volume %d on server %s", t.volumeID, t.server)
+ // Step 2A: Cleanup existing EC shards if any
+ glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID)
+ t.SetProgress(10.0)
+ err = t.cleanupExistingEcShards()
+ if err != nil {
+ glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err)
+ // Don't fail the task - this is just cleanup
+ }
+ glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID)
+
+ // Step 3: Mark volume readonly on all servers
+ glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID)
+ t.SetProgress(15.0)
+ err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings)
+ if err != nil {
+ return fmt.Errorf("failed to mark volume readonly: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID)
+
+ // Step 5: Copy volume files (.dat, .idx) to EC worker
+ glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer)
+ t.SetProgress(25.0)
+ localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to copy volume files to EC worker: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles)
+
+ // Step 6: Generate EC shards locally on EC worker
+ glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker")
+ t.SetProgress(40.0)
+ localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to generate EC shards locally: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles))
+
+ // Step 7: Distribute shards from EC worker to destination servers
+ glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers")
+ t.SetProgress(60.0)
+ err = t.distributeEcShardsFromWorker(localShardFiles)
+ if err != nil {
+ return fmt.Errorf("failed to distribute EC shards from worker: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers")
+
+ // Step 8: Mount EC shards on destination servers
+ glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers")
+ t.SetProgress(80.0)
+ err = t.mountEcShardsOnDestinations()
+ if err != nil {
+ return fmt.Errorf("failed to mount EC shards: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards mounted successfully")
+
+ // Step 9: Delete original volume from all locations
+ glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID)
+ t.SetProgress(90.0)
+ err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings)
+ if err != nil {
+ return fmt.Errorf("failed to delete original volume: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID)
+
+ t.SetProgress(100.0)
+ glog.Infof("EC task completed successfully for volume %d", t.volumeID)
return nil
}
-// Validate validates the task parameters
-func (t *Task) Validate(params types.TaskParams) error {
- if params.VolumeID == 0 {
- return fmt.Errorf("volume_id is required")
+// collectVolumeLocations gets volume location from master (placeholder implementation)
+func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) {
+ // For now, return a placeholder implementation
+ // Full implementation would call master to get volume locations
+ return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil
+}
+
+// cleanupExistingEcShards deletes existing EC shards using planned locations
+func (t *Task) cleanupExistingEcShards() error {
+ if len(t.existingShardLocations) == 0 {
+ glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID)
+ return nil
}
- if params.Server == "" {
- return fmt.Errorf("server is required")
+
+ glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations))
+
+ // Delete existing shards from each location using planned shard locations
+ for _, location := range t.existingShardLocations {
+ if len(location.ShardIds) == 0 {
+ continue
+ }
+
+ glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ ShardIds: location.ShardIds,
+ })
+ return deleteErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err)
+ // Continue with other servers - don't fail the entire cleanup
+ } else {
+ glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
+ }
}
+
+ glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID)
return nil
}
-// EstimateTime estimates the time needed for the task
-func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
- // Base time for erasure coding operation
- baseTime := 30 * time.Second
+// shouldPerformECEncoding checks if the volume meets criteria for EC encoding
+func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool {
+ // For now, always proceed with EC encoding if volume exists
+ // This can be extended with volume size checks, etc.
+ return len(volumeLocations) > 0
+}
- // Could adjust based on volume size or other factors
- return baseTime
+// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers
+func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error {
+ glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations))
+
+ // Mark volume readonly on all replica servers
+ for _, location := range volumeLocations {
+ glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: uint32(volumeId),
+ })
+ return markErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err)
+ return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err)
+ }
+
+ glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location)
+ }
+
+ glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations))
+ return nil
+}
+
+// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
+func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
+ localFiles := make(map[string]string)
+
+ // Copy .dat file
+ datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
+ err := t.copyFileFromSource(".dat", datFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to copy .dat file: %v", err)
+ }
+ localFiles["dat"] = datFile
+ glog.V(1).Infof("Copied .dat file to: %s", datFile)
+
+ // Copy .idx file
+ idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
+ err = t.copyFileFromSource(".idx", idxFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to copy .idx file: %v", err)
+ }
+ localFiles["idx"] = idxFile
+ glog.V(1).Infof("Copied .idx file to: %s", idxFile)
+
+ return localFiles, nil
+}
+
+// copyFileFromSource copies a file from source server to local path using gRPC streaming
+func (t *Task) copyFileFromSource(ext, localPath string) error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ Ext: ext,
+ StopOffset: uint64(math.MaxInt64),
+ })
+ if err != nil {
+ return fmt.Errorf("failed to initiate file copy: %v", err)
+ }
+
+ // Create local file
+ localFile, err := os.Create(localPath)
+ if err != nil {
+ return fmt.Errorf("failed to create local file %s: %v", localPath, err)
+ }
+ defer localFile.Close()
+
+ // Stream data and write to local file
+ totalBytes := int64(0)
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("failed to receive file data: %v", err)
+ }
+
+ if len(resp.FileContent) > 0 {
+ written, writeErr := localFile.Write(resp.FileContent)
+ if writeErr != nil {
+ return fmt.Errorf("failed to write to local file: %v", writeErr)
+ }
+ totalBytes += int64(written)
+ }
+ }
+
+ glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath)
+ return nil
+ })
+}
+
+// generateEcShardsLocally generates EC shards from local volume files
+func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
+ datFile := localFiles["dat"]
+ idxFile := localFiles["idx"]
+
+ if datFile == "" || idxFile == "" {
+ return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
+ }
+
+ // Get base name without extension for EC operations
+ baseName := strings.TrimSuffix(datFile, ".dat")
+
+ shardFiles := make(map[string]string)
+
+ glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
+
+ // Generate EC shard files (.ec00 ~ .ec13)
+ if err := erasure_coding.WriteEcFiles(baseName); err != nil {
+ return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
+ }
+
+ // Generate .ecx file from .idx
+ if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil {
+ return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
+ }
+
+ // Collect generated shard file paths
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
+ if _, err := os.Stat(shardFile); err == nil {
+ shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
+ }
+ }
+
+ // Add metadata files
+ ecxFile := idxFile + ".ecx"
+ if _, err := os.Stat(ecxFile); err == nil {
+ shardFiles["ecx"] = ecxFile
+ }
+
+ // Generate .vif file (volume info)
+ vifFile := baseName + ".vif"
+ // Create basic volume info - in a real implementation, this would come from the original volume
+ volumeInfo := &volume_server_pb.VolumeInfo{
+ Version: uint32(needle.GetCurrentVersion()),
+ }
+ if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
+ glog.Warningf("Failed to create .vif file: %v", err)
+ } else {
+ shardFiles["vif"] = vifFile
+ }
+
+ glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
+ return shardFiles, nil
+}
+
+func (t *Task) copyEcShardsToDestinations() error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations))
+
+ // Prepare shard IDs (0-13 for EC shards)
+ var shardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardIds = append(shardIds, uint32(i))
+ }
+
+ // Distribute shards across destinations
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Track which disks have already received metadata files (server+disk)
+ metadataFilesCopied := make(map[string]bool)
+ var metadataMutex sync.Mutex
+
+ // For each destination, copy a subset of shards
+ shardsPerDest := len(shardIds) / len(destinations)
+ remainder := len(shardIds) % len(destinations)
+
+ shardOffset := 0
+ for i, dest := range destinations {
+ wg.Add(1)
+
+ shardsForThisDest := shardsPerDest
+ if i < remainder {
+ shardsForThisDest++ // Distribute remainder shards
+ }
+
+ destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
+ shardOffset += shardsForThisDest
+
+ go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard copy")
+ return
+ }
+
+ // Create disk-specific metadata key (server+disk)
+ diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
+
+ glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)",
+ targetShardIds, t.sourceServer, destination.Node, destination.DiskId)
+
+ // Check if this disk needs metadata files (only once per disk)
+ metadataMutex.Lock()
+ needsMetadataFiles := !metadataFilesCopied[diskKey]
+ if needsMetadataFiles {
+ metadataFilesCopied[diskKey] = true
+ }
+ metadataMutex.Unlock()
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(t.volumeID),
+ Collection: t.collection,
+ ShardIds: targetShardIds,
+ CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk
+ CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk
+ CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk
+ SourceDataNode: t.sourceServer,
+ DiskId: destination.DiskId, // Pass target disk ID
+ })
+ return copyErr
+ })
+
+ if err != nil {
+ errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
+ return
+ }
+
+ if needsMetadataFiles {
+ glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d",
+ targetShardIds, destination.Node, destination.DiskId)
+ } else {
+ glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)",
+ targetShardIds, destination.Node, destination.DiskId)
+ }
+ }(dest, destShardIds)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any copy errors
+ if err := <-errorChan; err != nil {
+ return err
+ }
+
+ glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers
+func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations))
+
+ // Prepare shard IDs (0-13 for EC shards)
+ var shardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardIds = append(shardIds, uint32(i))
+ }
+
+ // Distribute shards across destinations
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Track which disks have already received metadata files (server+disk)
+ metadataFilesCopied := make(map[string]bool)
+ var metadataMutex sync.Mutex
+
+ // For each destination, send a subset of shards
+ shardsPerDest := len(shardIds) / len(destinations)
+ remainder := len(shardIds) % len(destinations)
+
+ shardOffset := 0
+ for i, dest := range destinations {
+ wg.Add(1)
+
+ shardsForThisDest := shardsPerDest
+ if i < remainder {
+ shardsForThisDest++ // Distribute remainder shards
+ }
+
+ destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
+ shardOffset += shardsForThisDest
+
+ go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard distribution")
+ return
+ }
+
+ // Create disk-specific metadata key (server+disk)
+ diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
+
+ glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)",
+ targetShardIds, destination.Node, destination.DiskId)
+
+ // Check if this disk needs metadata files (only once per disk)
+ metadataMutex.Lock()
+ needsMetadataFiles := !metadataFilesCopied[diskKey]
+ if needsMetadataFiles {
+ metadataFilesCopied[diskKey] = true
+ }
+ metadataMutex.Unlock()
+
+ // Send shard files to destination using HTTP upload (simplified for now)
+ err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles)
+ if err != nil {
+ errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
+ return
+ }
+
+ if needsMetadataFiles {
+ glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d",
+ targetShardIds, destination.Node, destination.DiskId)
+ } else {
+ glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)",
+ targetShardIds, destination.Node, destination.DiskId)
+ }
+ }(dest, destShardIds)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any distribution errors
+ if err := <-errorChan; err != nil {
+ return err
+ }
+
+ glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// sendShardsToDestination sends specific shard files from worker to a destination server (simplified)
+func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error {
+ // For now, use a simplified approach - just upload the files
+ // In a full implementation, this would use proper file upload mechanisms
+ glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId)
+
+ // TODO: Implement actual file upload to volume server
+ // This is a placeholder - actual implementation would:
+ // 1. Open each shard file locally
+ // 2. Upload via HTTP POST or gRPC stream to destination volume server
+ // 3. Volume server would save to the specified disk_id
+
+ return nil
+}
+
+// mountEcShardsOnDestinations mounts EC shards on all destination servers
+func (t *Task) mountEcShardsOnDestinations() error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for mounting EC shards")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations))
+
+ // Prepare all shard IDs (0-13)
+ var allShardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ allShardIds = append(allShardIds, uint32(i))
+ }
+
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Mount shards on each destination server
+ for _, dest := range destinations {
+ wg.Add(1)
+
+ go func(destination *worker_pb.ECDestination) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard mounting")
+ return
+ }
+
+ glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: uint32(t.volumeID),
+ Collection: t.collection,
+ ShardIds: allShardIds, // Mount all available shards on each server
+ })
+ return mountErr
+ })
+
+ if err != nil {
+ // It's normal for some servers to not have all shards, so log as warning rather than error
+ glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err)
+ } else {
+ glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId)
+ }
+ }(dest)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any critical mounting errors
+ select {
+ case err := <-errorChan:
+ if err != nil {
+ glog.Warningf("Some shard mounting issues occurred: %v", err)
+ }
+ default:
+ // No errors
+ }
+
+ glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// deleteVolumeFromAllLocations deletes the original volume from all replica servers
+func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error {
+ glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations))
+
+ for _, location := range volumeLocations {
+ glog.V(1).Infof("Deleting volume %d from %s", volumeId, location)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
+ VolumeId: uint32(volumeId),
+ OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards
+ })
+ return deleteErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err)
+ return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err)
+ }
+
+ glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location)
+ }
+
+ glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations))
+ return nil
+}
+
+// Register the task in the global registry
+func init() {
+ types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask)
+ glog.V(1).Infof("Registered EC task")
}
diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go
deleted file mode 100644
index 0f8b5e376..000000000
--- a/weed/worker/tasks/erasure_coding/ec_detector.go
+++ /dev/null
@@ -1,139 +0,0 @@
-package erasure_coding
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// EcDetector implements erasure coding task detection
-type EcDetector struct {
- enabled bool
- volumeAgeHours int
- fullnessRatio float64
- scanInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*EcDetector)(nil)
-)
-
-// NewEcDetector creates a new erasure coding detector
-func NewEcDetector() *EcDetector {
- return &EcDetector{
- enabled: false, // Conservative default
- volumeAgeHours: 24 * 7, // 1 week
- fullnessRatio: 0.9, // 90% full
- scanInterval: 2 * time.Hour,
- }
-}
-
-// GetTaskType returns the task type
-func (d *EcDetector) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// ScanForTasks scans for volumes that should be converted to erasure coding
-func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- var results []*types.TaskDetectionResult
- now := time.Now()
- ageThreshold := time.Duration(d.volumeAgeHours) * time.Hour
-
- for _, metric := range volumeMetrics {
- // Skip if already EC volume
- if metric.IsECVolume {
- continue
- }
-
- // Check age and fullness criteria
- if metric.Age >= ageThreshold && metric.FullnessRatio >= d.fullnessRatio {
- // Check if volume is read-only (safe for EC conversion)
- if !metric.IsReadOnly {
- continue
- }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeErasureCoding,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: types.TaskPriorityLow, // EC is not urgent
- Reason: "Volume is old and full enough for EC conversion",
- Parameters: map[string]interface{}{
- "age_hours": int(metric.Age.Hours()),
- "fullness_ratio": metric.FullnessRatio,
- },
- ScheduleAt: now,
- }
- results = append(results, result)
- }
- }
-
- glog.V(2).Infof("EC detector found %d tasks to schedule", len(results))
- return results, nil
-}
-
-// ScanInterval returns how often this task type should be scanned
-func (d *EcDetector) ScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// IsEnabled returns whether this task type is enabled
-func (d *EcDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// Configuration setters
-
-func (d *EcDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
-}
-
-func (d *EcDetector) SetVolumeAgeHours(hours int) {
- d.volumeAgeHours = hours
-}
-
-func (d *EcDetector) SetFullnessRatio(ratio float64) {
- d.fullnessRatio = ratio
-}
-
-func (d *EcDetector) SetScanInterval(interval time.Duration) {
- d.scanInterval = interval
-}
-
-// GetVolumeAgeHours returns the current volume age threshold in hours
-func (d *EcDetector) GetVolumeAgeHours() int {
- return d.volumeAgeHours
-}
-
-// GetFullnessRatio returns the current fullness ratio threshold
-func (d *EcDetector) GetFullnessRatio() float64 {
- return d.fullnessRatio
-}
-
-// GetScanInterval returns the scan interval
-func (d *EcDetector) GetScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// ConfigureFromPolicy configures the detector based on the maintenance policy
-func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
- // Type assert to the maintenance policy type we expect
- if maintenancePolicy, ok := policy.(interface {
- GetECEnabled() bool
- GetECVolumeAgeHours() int
- GetECFullnessRatio() float64
- }); ok {
- d.SetEnabled(maintenancePolicy.GetECEnabled())
- d.SetVolumeAgeHours(maintenancePolicy.GetECVolumeAgeHours())
- d.SetFullnessRatio(maintenancePolicy.GetECFullnessRatio())
- } else {
- glog.V(1).Infof("Could not configure EC detector from policy: unsupported policy type")
- }
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go
index 6c4b5bf7f..62cfe6b56 100644
--- a/weed/worker/tasks/erasure_coding/ec_register.go
+++ b/weed/worker/tasks/erasure_coding/ec_register.go
@@ -2,80 +2,71 @@ package erasure_coding
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates erasure coding task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new erasure coding task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeErasureCoding,
- []string{"erasure_coding", "storage", "durability"},
- "Convert volumes to erasure coded format for improved durability",
- ),
- }
-}
-
-// Create creates a new erasure coding task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterErasureCodingTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *EcDetector
- sharedScheduler *Scheduler
-)
+// RegisterErasureCodingTask registers the erasure coding task with the new architecture
+func RegisterErasureCodingTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*EcDetector, *Scheduler) {
- if sharedDetector == nil {
- sharedDetector = NewEcDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeErasureCoding,
+ Name: "erasure_coding",
+ DisplayName: "Erasure Coding",
+ Description: "Applies erasure coding to volumes for data protection",
+ Icon: "fas fa-shield-alt text-success",
+ Capabilities: []string{"erasure_coding", "data_protection"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: nil, // Uses typed task system - see init() in ec.go
+ DetectionFunc: Detection,
+ ScanInterval: 1 * time.Hour,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 24 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*EcDetector, *Scheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeErasureCoding, factory)
+// UpdateConfigFromPersistence updates the erasure coding configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("erasure coding task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated erasure coding task configuration from persistence")
+ return nil
}
diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go
deleted file mode 100644
index b2366bb06..000000000
--- a/weed/worker/tasks/erasure_coding/ec_scheduler.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package erasure_coding
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// Scheduler implements erasure coding task scheduling
-type Scheduler struct {
- maxConcurrent int
- enabled bool
-}
-
-// NewScheduler creates a new erasure coding scheduler
-func NewScheduler() *Scheduler {
- return &Scheduler{
- maxConcurrent: 1, // Conservative default
- enabled: false, // Conservative default
- }
-}
-
-// GetTaskType returns the task type
-func (s *Scheduler) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// CanScheduleNow determines if an erasure coding task can be scheduled now
-func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- if !s.enabled {
- return false
- }
-
- // Check if we have available workers
- if len(availableWorkers) == 0 {
- return false
- }
-
- // Count running EC tasks
- runningCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeErasureCoding {
- runningCount++
- }
- }
-
- // Check concurrency limit
- if runningCount >= s.maxConcurrent {
- glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
- return false
- }
-
- // Check if any worker can handle EC tasks
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeErasureCoding {
- glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
- return true
- }
- }
- }
-
- return false
-}
-
-// GetMaxConcurrent returns the maximum number of concurrent tasks
-func (s *Scheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
-func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
- return 24 * time.Hour // Don't repeat EC for 24 hours
-}
-
-// GetPriority returns the priority for this task
-func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
- return types.TaskPriorityLow // EC is not urgent
-}
-
-// WasTaskRecentlyCompleted checks if a similar task was recently completed
-func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
- // Don't repeat EC for 24 hours
- interval := 24 * time.Hour
- cutoff := now.Add(-interval)
-
- for _, completedTask := range completedTasks {
- if completedTask.Type == types.TaskTypeErasureCoding &&
- completedTask.VolumeID == task.VolumeID &&
- completedTask.Server == task.Server &&
- completedTask.Status == types.TaskStatusCompleted &&
- completedTask.CompletedAt != nil &&
- completedTask.CompletedAt.After(cutoff) {
- return true
- }
- }
- return false
-}
-
-// IsEnabled returns whether this task type is enabled
-func (s *Scheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// Configuration setters
-
-func (s *Scheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
-}
-
-func (s *Scheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
-}
diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go
deleted file mode 100644
index e17cba89a..000000000
--- a/weed/worker/tasks/erasure_coding/ui.go
+++ /dev/null
@@ -1,309 +0,0 @@
-package erasure_coding
-
-import (
- "fmt"
- "html/template"
- "strconv"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// UIProvider provides the UI for erasure coding task configuration
-type UIProvider struct {
- detector *EcDetector
- scheduler *Scheduler
-}
-
-// NewUIProvider creates a new erasure coding UI provider
-func NewUIProvider(detector *EcDetector, scheduler *Scheduler) *UIProvider {
- return &UIProvider{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetTaskType returns the task type
-func (ui *UIProvider) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// GetDisplayName returns the human-readable name
-func (ui *UIProvider) GetDisplayName() string {
- return "Erasure Coding"
-}
-
-// GetDescription returns a description of what this task does
-func (ui *UIProvider) GetDescription() string {
- return "Converts volumes to erasure coded format for improved data durability and fault tolerance"
-}
-
-// GetIcon returns the icon CSS class for this task type
-func (ui *UIProvider) GetIcon() string {
- return "fas fa-shield-alt text-info"
-}
-
-// ErasureCodingConfig represents the erasure coding configuration
-type ErasureCodingConfig struct {
- Enabled bool `json:"enabled"`
- VolumeAgeHoursSeconds int `json:"volume_age_hours_seconds"`
- FullnessRatio float64 `json:"fullness_ratio"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- ShardCount int `json:"shard_count"`
- ParityCount int `json:"parity_count"`
- CollectionFilter string `json:"collection_filter"`
-}
-
-// Helper functions for duration conversion
-func secondsToDuration(seconds int) time.Duration {
- return time.Duration(seconds) * time.Second
-}
-
-func durationToSeconds(d time.Duration) int {
- return int(d.Seconds())
-}
-
-// formatDurationForUser formats seconds as a user-friendly duration string
-func formatDurationForUser(seconds int) string {
- d := secondsToDuration(seconds)
- if d < time.Minute {
- return fmt.Sprintf("%ds", seconds)
- }
- if d < time.Hour {
- return fmt.Sprintf("%.0fm", d.Minutes())
- }
- if d < 24*time.Hour {
- return fmt.Sprintf("%.1fh", d.Hours())
- }
- return fmt.Sprintf("%.1fd", d.Hours()/24)
-}
-
-// RenderConfigForm renders the configuration form HTML
-func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
- config := ui.getCurrentECConfig()
-
- // Build form using the FormBuilder helper
- form := types.NewFormBuilder()
-
- // Detection Settings
- form.AddCheckboxField(
- "enabled",
- "Enable Erasure Coding Tasks",
- "Whether erasure coding tasks should be automatically created",
- config.Enabled,
- )
-
- form.AddNumberField(
- "volume_age_hours_seconds",
- "Volume Age Threshold",
- "Only apply erasure coding to volumes older than this duration",
- float64(config.VolumeAgeHoursSeconds),
- true,
- )
-
- form.AddNumberField(
- "scan_interval_seconds",
- "Scan Interval",
- "How often to scan for volumes needing erasure coding",
- float64(config.ScanIntervalSeconds),
- true,
- )
-
- // Scheduling Settings
- form.AddNumberField(
- "max_concurrent",
- "Max Concurrent Tasks",
- "Maximum number of erasure coding tasks that can run simultaneously",
- float64(config.MaxConcurrent),
- true,
- )
-
- // Erasure Coding Parameters
- form.AddNumberField(
- "shard_count",
- "Data Shards",
- "Number of data shards for erasure coding (recommended: 10)",
- float64(config.ShardCount),
- true,
- )
-
- form.AddNumberField(
- "parity_count",
- "Parity Shards",
- "Number of parity shards for erasure coding (recommended: 4)",
- float64(config.ParityCount),
- true,
- )
-
- // Generate organized form sections using Bootstrap components
- html := `
-<div class="row">
- <div class="col-12">
- <div class="card mb-4">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-shield-alt me-2"></i>
- Erasure Coding Configuration
- </h5>
- </div>
- <div class="card-body">
-` + string(form.Build()) + `
- </div>
- </div>
- </div>
-</div>
-
-<div class="row">
- <div class="col-12">
- <div class="card mb-3">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-info-circle me-2"></i>
- Performance Impact
- </h5>
- </div>
- <div class="card-body">
- <div class="alert alert-info" role="alert">
- <h6 class="alert-heading">Important Notes:</h6>
- <p class="mb-2"><strong>Performance:</strong> Erasure coding is CPU and I/O intensive. Consider running during off-peak hours.</p>
- <p class="mb-0"><strong>Durability:</strong> With ` + fmt.Sprintf("%d+%d", config.ShardCount, config.ParityCount) + ` configuration, can tolerate up to ` + fmt.Sprintf("%d", config.ParityCount) + ` shard failures.</p>
- </div>
- </div>
- </div>
- </div>
-</div>`
-
- return template.HTML(html), nil
-}
-
-// ParseConfigForm parses form data into configuration
-func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
- config := &ErasureCodingConfig{}
-
- // Parse enabled
- config.Enabled = len(formData["enabled"]) > 0
-
- // Parse volume age hours
- if values, ok := formData["volume_age_hours_seconds"]; ok && len(values) > 0 {
- hours, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid volume age hours: %w", err)
- }
- config.VolumeAgeHoursSeconds = hours
- }
-
- // Parse scan interval
- if values, ok := formData["scan_interval_seconds"]; ok && len(values) > 0 {
- interval, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid scan interval: %w", err)
- }
- config.ScanIntervalSeconds = interval
- }
-
- // Parse max concurrent
- if values, ok := formData["max_concurrent"]; ok && len(values) > 0 {
- maxConcurrent, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid max concurrent: %w", err)
- }
- if maxConcurrent < 1 {
- return nil, fmt.Errorf("max concurrent must be at least 1")
- }
- config.MaxConcurrent = maxConcurrent
- }
-
- // Parse shard count
- if values, ok := formData["shard_count"]; ok && len(values) > 0 {
- shardCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid shard count: %w", err)
- }
- if shardCount < 1 {
- return nil, fmt.Errorf("shard count must be at least 1")
- }
- config.ShardCount = shardCount
- }
-
- // Parse parity count
- if values, ok := formData["parity_count"]; ok && len(values) > 0 {
- parityCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid parity count: %w", err)
- }
- if parityCount < 1 {
- return nil, fmt.Errorf("parity count must be at least 1")
- }
- config.ParityCount = parityCount
- }
-
- return config, nil
-}
-
-// GetCurrentConfig returns the current configuration
-func (ui *UIProvider) GetCurrentConfig() interface{} {
- return ui.getCurrentECConfig()
-}
-
-// ApplyConfig applies the new configuration
-func (ui *UIProvider) ApplyConfig(config interface{}) error {
- ecConfig, ok := config.(ErasureCodingConfig)
- if !ok {
- return fmt.Errorf("invalid config type, expected ErasureCodingConfig")
- }
-
- // Apply to detector
- if ui.detector != nil {
- ui.detector.SetEnabled(ecConfig.Enabled)
- ui.detector.SetVolumeAgeHours(ecConfig.VolumeAgeHoursSeconds)
- ui.detector.SetScanInterval(secondsToDuration(ecConfig.ScanIntervalSeconds))
- }
-
- // Apply to scheduler
- if ui.scheduler != nil {
- ui.scheduler.SetEnabled(ecConfig.Enabled)
- ui.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent)
- }
-
- glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, age_threshold=%v, max_concurrent=%d, shards=%d+%d",
- ecConfig.Enabled, ecConfig.VolumeAgeHoursSeconds, ecConfig.MaxConcurrent, ecConfig.ShardCount, ecConfig.ParityCount)
-
- return nil
-}
-
-// getCurrentECConfig gets the current configuration from detector and scheduler
-func (ui *UIProvider) getCurrentECConfig() ErasureCodingConfig {
- config := ErasureCodingConfig{
- // Default values (fallback if detectors/schedulers are nil)
- Enabled: true,
- VolumeAgeHoursSeconds: 24 * 3600, // 24 hours in seconds
- ScanIntervalSeconds: 2 * 3600, // 2 hours in seconds
- MaxConcurrent: 1,
- ShardCount: 10,
- ParityCount: 4,
- }
-
- // Get current values from detector
- if ui.detector != nil {
- config.Enabled = ui.detector.IsEnabled()
- config.VolumeAgeHoursSeconds = ui.detector.GetVolumeAgeHours()
- config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval())
- }
-
- // Get current values from scheduler
- if ui.scheduler != nil {
- config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
- }
-
- return config
-}
-
-// RegisterUI registers the erasure coding UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) {
- uiProvider := NewUIProvider(detector, scheduler)
- uiRegistry.RegisterUI(uiProvider)
-
- glog.V(1).Infof("βœ… Registered erasure coding task UI provider")
-}
diff --git a/weed/worker/tasks/schema_provider.go b/weed/worker/tasks/schema_provider.go
new file mode 100644
index 000000000..4d69556b1
--- /dev/null
+++ b/weed/worker/tasks/schema_provider.go
@@ -0,0 +1,51 @@
+package tasks
+
+import (
+ "sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+)
+
+// TaskConfigSchema defines the schema for task configuration
+type TaskConfigSchema struct {
+ config.Schema // Embed common schema functionality
+ TaskName string `json:"task_name"`
+ DisplayName string `json:"display_name"`
+ Description string `json:"description"`
+ Icon string `json:"icon"`
+}
+
+// TaskConfigSchemaProvider is an interface for providing task configuration schemas
+type TaskConfigSchemaProvider interface {
+ GetConfigSchema() *TaskConfigSchema
+}
+
+// schemaRegistry maintains a registry of schema providers by task type
+type schemaRegistry struct {
+ providers map[string]TaskConfigSchemaProvider
+ mutex sync.RWMutex
+}
+
+var globalSchemaRegistry = &schemaRegistry{
+ providers: make(map[string]TaskConfigSchemaProvider),
+}
+
+// RegisterTaskConfigSchema registers a schema provider for a task type
+func RegisterTaskConfigSchema(taskType string, provider TaskConfigSchemaProvider) {
+ globalSchemaRegistry.mutex.Lock()
+ defer globalSchemaRegistry.mutex.Unlock()
+ globalSchemaRegistry.providers[taskType] = provider
+}
+
+// GetTaskConfigSchema returns the schema for the specified task type
+func GetTaskConfigSchema(taskType string) *TaskConfigSchema {
+ globalSchemaRegistry.mutex.RLock()
+ provider, exists := globalSchemaRegistry.providers[taskType]
+ globalSchemaRegistry.mutex.RUnlock()
+
+ if !exists {
+ return nil
+ }
+
+ return provider.GetConfigSchema()
+}
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go
index 482233f60..15369c137 100644
--- a/weed/worker/tasks/task.go
+++ b/weed/worker/tasks/task.go
@@ -2,29 +2,69 @@ package tasks
import (
"context"
+ "fmt"
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BaseTask provides common functionality for all tasks
type BaseTask struct {
taskType types.TaskType
+ taskID string
progress float64
cancelled bool
mutex sync.RWMutex
startTime time.Time
estimatedDuration time.Duration
+ logger TaskLogger
+ loggerConfig TaskLoggerConfig
+ progressCallback func(float64) // Callback function for progress updates
}
// NewBaseTask creates a new base task
func NewBaseTask(taskType types.TaskType) *BaseTask {
return &BaseTask{
- taskType: taskType,
- progress: 0.0,
- cancelled: false,
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: DefaultTaskLoggerConfig(),
+ }
+}
+
+// NewBaseTaskWithLogger creates a new base task with custom logger configuration
+func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask {
+ return &BaseTask{
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: loggerConfig,
+ }
+}
+
+// InitializeLogger initializes the task logger with task details
+func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error {
+ return t.InitializeTaskLogger(taskID, workerID, params)
+}
+
+// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
+func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ t.taskID = taskID
+
+ logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig)
+ if err != nil {
+ return fmt.Errorf("failed to initialize task logger: %w", err)
}
+
+ t.logger = logger
+ t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType)
+
+ return nil
}
// Type returns the task type
@@ -39,24 +79,47 @@ func (t *BaseTask) GetProgress() float64 {
return t.progress
}
-// SetProgress sets the current progress
+// SetProgress sets the current progress and logs it
func (t *BaseTask) SetProgress(progress float64) {
t.mutex.Lock()
- defer t.mutex.Unlock()
if progress < 0 {
progress = 0
}
if progress > 100 {
progress = 100
}
+ oldProgress := t.progress
+ callback := t.progressCallback
t.progress = progress
+ t.mutex.Unlock()
+
+ // Log progress change
+ if t.logger != nil && progress != oldProgress {
+ t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress))
+ }
+
+ // Call progress callback if set
+ if callback != nil && progress != oldProgress {
+ callback(progress)
+ }
}
// Cancel cancels the task
func (t *BaseTask) Cancel() error {
t.mutex.Lock()
defer t.mutex.Unlock()
+
+ if t.cancelled {
+ return nil
+ }
+
t.cancelled = true
+
+ if t.logger != nil {
+ t.logger.LogStatus("cancelled", "Task cancelled by request")
+ t.logger.Warning("Task %s was cancelled", t.taskID)
+ }
+
return nil
}
@@ -72,6 +135,10 @@ func (t *BaseTask) SetStartTime(startTime time.Time) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.startTime = startTime
+
+ if t.logger != nil {
+ t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339)))
+ }
}
// GetStartTime returns the task start time
@@ -86,6 +153,13 @@ func (t *BaseTask) SetEstimatedDuration(duration time.Duration) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.estimatedDuration = duration
+
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{
+ "estimated_duration": duration.String(),
+ "estimated_seconds": duration.Seconds(),
+ })
+ }
}
// GetEstimatedDuration returns the estimated duration
@@ -95,11 +169,115 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration {
return t.estimatedDuration
}
-// ExecuteTask is a wrapper that handles common task execution logic
+// SetProgressCallback sets the progress callback function
+func (t *BaseTask) SetProgressCallback(callback func(float64)) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.progressCallback = callback
+}
+
+// SetLoggerConfig sets the logger configuration for this task
+func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.loggerConfig = config
+}
+
+// GetLogger returns the task logger
+func (t *BaseTask) GetLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// GetTaskLogger returns the task logger (LoggerProvider interface)
+func (t *BaseTask) GetTaskLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// LogInfo logs an info message
+func (t *BaseTask) LogInfo(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Info(message, args...)
+ }
+}
+
+// LogWarning logs a warning message
+func (t *BaseTask) LogWarning(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Warning(message, args...)
+ }
+}
+
+// LogError logs an error message
+func (t *BaseTask) LogError(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Error(message, args...)
+ }
+}
+
+// LogDebug logs a debug message
+func (t *BaseTask) LogDebug(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Debug(message, args...)
+ }
+}
+
+// LogWithFields logs a message with structured fields
+func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) {
+ if t.logger != nil {
+ t.logger.LogWithFields(level, message, fields)
+ }
+}
+
+// FinishTask finalizes the task and closes the logger
+func (t *BaseTask) FinishTask(success bool, errorMsg string) error {
+ if t.logger != nil {
+ if success {
+ t.logger.LogStatus("completed", "Task completed successfully")
+ t.logger.Info("Task %s finished successfully", t.taskID)
+ } else {
+ t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg))
+ t.logger.Error("Task %s failed: %s", t.taskID, errorMsg)
+ }
+
+ // Close logger
+ if err := t.logger.Close(); err != nil {
+ glog.Errorf("Failed to close task logger: %v", err)
+ }
+ }
+
+ return nil
+}
+
+// ExecuteTask is a wrapper that handles common task execution logic with logging
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
+ // Initialize logger if not already done
+ if t.logger == nil {
+ // Generate a temporary task ID if none provided
+ if t.taskID == "" {
+ t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano())
+ }
+
+ workerID := "unknown"
+ if err := t.InitializeLogger(t.taskID, workerID, params); err != nil {
+ glog.Warningf("Failed to initialize task logger: %v", err)
+ }
+ }
+
t.SetStartTime(time.Now())
t.SetProgress(0)
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{
+ "volume_id": params.VolumeID,
+ "server": params.Server,
+ "collection": params.Collection,
+ })
+ }
+
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -114,21 +292,29 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe
// Check cancellation every second
}
}
+ t.LogWarning("Task cancellation detected, cancelling context")
cancel()
}()
// Execute the actual task
+ t.LogInfo("Starting task executor")
err := executor(ctx, params)
if err != nil {
+ t.LogError("Task executor failed: %v", err)
+ t.FinishTask(false, err.Error())
return err
}
if t.IsCancelled() {
+ t.LogWarning("Task was cancelled during execution")
+ t.FinishTask(false, "cancelled")
return context.Canceled
}
t.SetProgress(100)
+ t.LogInfo("Task executor completed successfully")
+ t.FinishTask(true, "")
return nil
}
diff --git a/weed/worker/tasks/task_log_handler.go b/weed/worker/tasks/task_log_handler.go
new file mode 100644
index 000000000..be5f00f12
--- /dev/null
+++ b/weed/worker/tasks/task_log_handler.go
@@ -0,0 +1,230 @@
+package tasks
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+)
+
+// TaskLogHandler handles task log requests from admin server
+type TaskLogHandler struct {
+ baseLogDir string
+}
+
+// NewTaskLogHandler creates a new task log handler
+func NewTaskLogHandler(baseLogDir string) *TaskLogHandler {
+ if baseLogDir == "" {
+ baseLogDir = "/tmp/seaweedfs/task_logs"
+ }
+ return &TaskLogHandler{
+ baseLogDir: baseLogDir,
+ }
+}
+
+// HandleLogRequest processes a task log request and returns the response
+func (h *TaskLogHandler) HandleLogRequest(request *worker_pb.TaskLogRequest) *worker_pb.TaskLogResponse {
+ response := &worker_pb.TaskLogResponse{
+ TaskId: request.TaskId,
+ WorkerId: request.WorkerId,
+ Success: false,
+ }
+
+ // Find the task log directory
+ logDir, err := h.findTaskLogDirectory(request.TaskId)
+ if err != nil {
+ response.ErrorMessage = fmt.Sprintf("Task log directory not found: %v", err)
+ glog.Warningf("Task log request failed for %s: %v", request.TaskId, err)
+ return response
+ }
+
+ // Read metadata if requested
+ if request.IncludeMetadata {
+ metadata, err := h.readTaskMetadata(logDir)
+ if err != nil {
+ response.ErrorMessage = fmt.Sprintf("Failed to read task metadata: %v", err)
+ glog.Warningf("Failed to read metadata for task %s: %v", request.TaskId, err)
+ return response
+ }
+ response.Metadata = metadata
+ }
+
+ // Read log entries
+ logEntries, err := h.readTaskLogEntries(logDir, request)
+ if err != nil {
+ response.ErrorMessage = fmt.Sprintf("Failed to read task logs: %v", err)
+ glog.Warningf("Failed to read logs for task %s: %v", request.TaskId, err)
+ return response
+ }
+
+ response.LogEntries = logEntries
+ response.Success = true
+
+ glog.V(1).Infof("Successfully retrieved %d log entries for task %s", len(logEntries), request.TaskId)
+ return response
+}
+
+// findTaskLogDirectory searches for the task log directory by task ID
+func (h *TaskLogHandler) findTaskLogDirectory(taskID string) (string, error) {
+ entries, err := os.ReadDir(h.baseLogDir)
+ if err != nil {
+ return "", fmt.Errorf("failed to read base log directory: %w", err)
+ }
+
+ // Look for directories that start with the task ID
+ for _, entry := range entries {
+ if entry.IsDir() && strings.HasPrefix(entry.Name(), taskID+"_") {
+ return filepath.Join(h.baseLogDir, entry.Name()), nil
+ }
+ }
+
+ return "", fmt.Errorf("task log directory not found for task ID: %s", taskID)
+}
+
+// readTaskMetadata reads task metadata from the log directory
+func (h *TaskLogHandler) readTaskMetadata(logDir string) (*worker_pb.TaskLogMetadata, error) {
+ metadata, err := GetTaskLogMetadata(logDir)
+ if err != nil {
+ return nil, err
+ }
+
+ // Convert to protobuf metadata
+ pbMetadata := &worker_pb.TaskLogMetadata{
+ TaskId: metadata.TaskID,
+ TaskType: metadata.TaskType,
+ WorkerId: metadata.WorkerID,
+ StartTime: metadata.StartTime.Unix(),
+ Status: metadata.Status,
+ Progress: float32(metadata.Progress),
+ VolumeId: metadata.VolumeID,
+ Server: metadata.Server,
+ Collection: metadata.Collection,
+ LogFilePath: metadata.LogFilePath,
+ CreatedAt: metadata.CreatedAt.Unix(),
+ CustomData: make(map[string]string),
+ }
+
+ // Set end time and duration if available
+ if metadata.EndTime != nil {
+ pbMetadata.EndTime = metadata.EndTime.Unix()
+ }
+ if metadata.Duration != nil {
+ pbMetadata.DurationMs = metadata.Duration.Milliseconds()
+ }
+
+ // Convert custom data
+ for key, value := range metadata.CustomData {
+ if strValue, ok := value.(string); ok {
+ pbMetadata.CustomData[key] = strValue
+ } else {
+ pbMetadata.CustomData[key] = fmt.Sprintf("%v", value)
+ }
+ }
+
+ return pbMetadata, nil
+}
+
+// readTaskLogEntries reads and filters log entries based on the request
+func (h *TaskLogHandler) readTaskLogEntries(logDir string, request *worker_pb.TaskLogRequest) ([]*worker_pb.TaskLogEntry, error) {
+ entries, err := ReadTaskLogs(logDir)
+ if err != nil {
+ return nil, err
+ }
+
+ // Apply filters
+ var filteredEntries []TaskLogEntry
+
+ for _, entry := range entries {
+ // Filter by log level
+ if request.LogLevel != "" && !strings.EqualFold(entry.Level, request.LogLevel) {
+ continue
+ }
+
+ // Filter by time range
+ if request.StartTime > 0 && entry.Timestamp.Unix() < request.StartTime {
+ continue
+ }
+ if request.EndTime > 0 && entry.Timestamp.Unix() > request.EndTime {
+ continue
+ }
+
+ filteredEntries = append(filteredEntries, entry)
+ }
+
+ // Limit entries if requested
+ if request.MaxEntries > 0 && len(filteredEntries) > int(request.MaxEntries) {
+ // Take the most recent entries
+ start := len(filteredEntries) - int(request.MaxEntries)
+ filteredEntries = filteredEntries[start:]
+ }
+
+ // Convert to protobuf entries
+ var pbEntries []*worker_pb.TaskLogEntry
+ for _, entry := range filteredEntries {
+ pbEntry := &worker_pb.TaskLogEntry{
+ Timestamp: entry.Timestamp.Unix(),
+ Level: entry.Level,
+ Message: entry.Message,
+ Fields: make(map[string]string),
+ }
+
+ // Set progress if available
+ if entry.Progress != nil {
+ pbEntry.Progress = float32(*entry.Progress)
+ }
+
+ // Set status if available
+ if entry.Status != nil {
+ pbEntry.Status = *entry.Status
+ }
+
+ // Convert fields
+ for key, value := range entry.Fields {
+ if strValue, ok := value.(string); ok {
+ pbEntry.Fields[key] = strValue
+ } else {
+ pbEntry.Fields[key] = fmt.Sprintf("%v", value)
+ }
+ }
+
+ pbEntries = append(pbEntries, pbEntry)
+ }
+
+ return pbEntries, nil
+}
+
+// ListAvailableTaskLogs returns a list of available task log directories
+func (h *TaskLogHandler) ListAvailableTaskLogs() ([]string, error) {
+ entries, err := os.ReadDir(h.baseLogDir)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read base log directory: %w", err)
+ }
+
+ var taskDirs []string
+ for _, entry := range entries {
+ if entry.IsDir() {
+ taskDirs = append(taskDirs, entry.Name())
+ }
+ }
+
+ return taskDirs, nil
+}
+
+// CleanupOldLogs removes old task logs beyond the specified limit
+func (h *TaskLogHandler) CleanupOldLogs(maxTasks int) error {
+ config := TaskLoggerConfig{
+ BaseLogDir: h.baseLogDir,
+ MaxTasks: maxTasks,
+ }
+
+ // Create a temporary logger to trigger cleanup
+ tempLogger := &FileTaskLogger{
+ config: config,
+ }
+
+ tempLogger.cleanupOldLogs()
+ return nil
+}
diff --git a/weed/worker/tasks/task_logger.go b/weed/worker/tasks/task_logger.go
new file mode 100644
index 000000000..e9c06c35c
--- /dev/null
+++ b/weed/worker/tasks/task_logger.go
@@ -0,0 +1,432 @@
+package tasks
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TaskLogger provides file-based logging for individual tasks
+type TaskLogger interface {
+ // Log methods
+ Info(message string, args ...interface{})
+ Warning(message string, args ...interface{})
+ Error(message string, args ...interface{})
+ Debug(message string, args ...interface{})
+
+ // Progress and status logging
+ LogProgress(progress float64, message string)
+ LogStatus(status string, message string)
+
+ // Structured logging
+ LogWithFields(level string, message string, fields map[string]interface{})
+
+ // Lifecycle
+ Close() error
+ GetLogDir() string
+}
+
+// LoggerProvider interface for tasks that support logging
+type LoggerProvider interface {
+ InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error
+ GetTaskLogger() TaskLogger
+}
+
+// TaskLoggerConfig holds configuration for task logging
+type TaskLoggerConfig struct {
+ BaseLogDir string
+ MaxTasks int // Maximum number of task logs to keep
+ MaxLogSizeMB int // Maximum log file size in MB
+ EnableConsole bool // Also log to console
+}
+
+// FileTaskLogger implements TaskLogger using file-based logging
+type FileTaskLogger struct {
+ taskID string
+ taskType types.TaskType
+ workerID string
+ logDir string
+ logFile *os.File
+ mutex sync.Mutex
+ config TaskLoggerConfig
+ metadata *TaskLogMetadata
+ closed bool
+}
+
+// TaskLogMetadata contains metadata about the task execution
+type TaskLogMetadata struct {
+ TaskID string `json:"task_id"`
+ TaskType string `json:"task_type"`
+ WorkerID string `json:"worker_id"`
+ StartTime time.Time `json:"start_time"`
+ EndTime *time.Time `json:"end_time,omitempty"`
+ Duration *time.Duration `json:"duration,omitempty"`
+ Status string `json:"status"`
+ Progress float64 `json:"progress"`
+ VolumeID uint32 `json:"volume_id,omitempty"`
+ Server string `json:"server,omitempty"`
+ Collection string `json:"collection,omitempty"`
+ CustomData map[string]interface{} `json:"custom_data,omitempty"`
+ LogFilePath string `json:"log_file_path"`
+ CreatedAt time.Time `json:"created_at"`
+}
+
+// TaskLogEntry represents a single log entry
+type TaskLogEntry struct {
+ Timestamp time.Time `json:"timestamp"`
+ Level string `json:"level"`
+ Message string `json:"message"`
+ Fields map[string]interface{} `json:"fields,omitempty"`
+ Progress *float64 `json:"progress,omitempty"`
+ Status *string `json:"status,omitempty"`
+}
+
+// DefaultTaskLoggerConfig returns default configuration
+func DefaultTaskLoggerConfig() TaskLoggerConfig {
+ return TaskLoggerConfig{
+ BaseLogDir: "/data/task_logs", // Use persistent data directory
+ MaxTasks: 100, // Keep last 100 task logs
+ MaxLogSizeMB: 10,
+ EnableConsole: true,
+ }
+}
+
+// NewTaskLogger creates a new file-based task logger
+func NewTaskLogger(taskID string, taskType types.TaskType, workerID string, params types.TaskParams, config TaskLoggerConfig) (TaskLogger, error) {
+ // Create unique directory name with timestamp
+ timestamp := time.Now().Format("20060102_150405")
+ dirName := fmt.Sprintf("%s_%s_%s_%s", taskID, taskType, workerID, timestamp)
+ logDir := filepath.Join(config.BaseLogDir, dirName)
+
+ // Create log directory
+ if err := os.MkdirAll(logDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create log directory %s: %w", logDir, err)
+ }
+
+ // Create log file
+ logFilePath := filepath.Join(logDir, "task.log")
+ logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create log file %s: %w", logFilePath, err)
+ }
+
+ // Create metadata
+ metadata := &TaskLogMetadata{
+ TaskID: taskID,
+ TaskType: string(taskType),
+ WorkerID: workerID,
+ StartTime: time.Now(),
+ Status: "started",
+ Progress: 0.0,
+ VolumeID: params.VolumeID,
+ Server: params.Server,
+ Collection: params.Collection,
+ CustomData: make(map[string]interface{}),
+ LogFilePath: logFilePath,
+ CreatedAt: time.Now(),
+ }
+
+ logger := &FileTaskLogger{
+ taskID: taskID,
+ taskType: taskType,
+ workerID: workerID,
+ logDir: logDir,
+ logFile: logFile,
+ config: config,
+ metadata: metadata,
+ closed: false,
+ }
+
+ // Write initial log entry
+ logger.Info("Task logger initialized for %s (type: %s, worker: %s)", taskID, taskType, workerID)
+ logger.LogWithFields("INFO", "Task parameters", map[string]interface{}{
+ "volume_id": params.VolumeID,
+ "server": params.Server,
+ "collection": params.Collection,
+ })
+
+ // Save initial metadata
+ if err := logger.saveMetadata(); err != nil {
+ glog.Warningf("Failed to save initial task metadata: %v", err)
+ }
+
+ // Clean up old task logs
+ go logger.cleanupOldLogs()
+
+ return logger, nil
+}
+
+// Info logs an info message
+func (l *FileTaskLogger) Info(message string, args ...interface{}) {
+ l.log("INFO", message, args...)
+}
+
+// Warning logs a warning message
+func (l *FileTaskLogger) Warning(message string, args ...interface{}) {
+ l.log("WARNING", message, args...)
+}
+
+// Error logs an error message
+func (l *FileTaskLogger) Error(message string, args ...interface{}) {
+ l.log("ERROR", message, args...)
+}
+
+// Debug logs a debug message
+func (l *FileTaskLogger) Debug(message string, args ...interface{}) {
+ l.log("DEBUG", message, args...)
+}
+
+// LogProgress logs task progress
+func (l *FileTaskLogger) LogProgress(progress float64, message string) {
+ l.mutex.Lock()
+ l.metadata.Progress = progress
+ l.mutex.Unlock()
+
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: "INFO",
+ Message: message,
+ Progress: &progress,
+ }
+
+ l.writeLogEntry(entry)
+ l.saveMetadata() // Update metadata with new progress
+}
+
+// LogStatus logs task status change
+func (l *FileTaskLogger) LogStatus(status string, message string) {
+ l.mutex.Lock()
+ l.metadata.Status = status
+ l.mutex.Unlock()
+
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: "INFO",
+ Message: message,
+ Status: &status,
+ }
+
+ l.writeLogEntry(entry)
+ l.saveMetadata() // Update metadata with new status
+}
+
+// LogWithFields logs a message with structured fields
+func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[string]interface{}) {
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: level,
+ Message: message,
+ Fields: fields,
+ }
+
+ l.writeLogEntry(entry)
+}
+
+// Close closes the logger and finalizes metadata
+func (l *FileTaskLogger) Close() error {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+
+ if l.closed {
+ return nil
+ }
+
+ // Finalize metadata
+ endTime := time.Now()
+ duration := endTime.Sub(l.metadata.StartTime)
+ l.metadata.EndTime = &endTime
+ l.metadata.Duration = &duration
+
+ if l.metadata.Status == "started" {
+ l.metadata.Status = "completed"
+ }
+
+ // Save final metadata
+ l.saveMetadata()
+
+ // Close log file
+ if l.logFile != nil {
+ if err := l.logFile.Close(); err != nil {
+ return fmt.Errorf("failed to close log file: %w", err)
+ }
+ }
+
+ l.closed = true
+ l.Info("Task logger closed for %s", l.taskID)
+
+ return nil
+}
+
+// GetLogDir returns the log directory path
+func (l *FileTaskLogger) GetLogDir() string {
+ return l.logDir
+}
+
+// log is the internal logging method
+func (l *FileTaskLogger) log(level string, message string, args ...interface{}) {
+ formattedMessage := fmt.Sprintf(message, args...)
+
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: level,
+ Message: formattedMessage,
+ }
+
+ l.writeLogEntry(entry)
+}
+
+// writeLogEntry writes a log entry to the file
+func (l *FileTaskLogger) writeLogEntry(entry TaskLogEntry) {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+
+ if l.closed || l.logFile == nil {
+ return
+ }
+
+ // Format as JSON line
+ jsonData, err := json.Marshal(entry)
+ if err != nil {
+ glog.Errorf("Failed to marshal log entry: %v", err)
+ return
+ }
+
+ // Write to file
+ if _, err := l.logFile.WriteString(string(jsonData) + "\n"); err != nil {
+ glog.Errorf("Failed to write log entry: %v", err)
+ return
+ }
+
+ // Flush to disk
+ if err := l.logFile.Sync(); err != nil {
+ glog.Errorf("Failed to sync log file: %v", err)
+ }
+
+ // Also log to console and stderr if enabled
+ if l.config.EnableConsole {
+ // Log to glog with proper call depth to show actual source location
+ // We need depth 3 to skip: writeLogEntry -> log -> Info/Warning/Error calls to reach the original caller
+ formattedMsg := fmt.Sprintf("[TASK-%s] %s: %s", l.taskID, entry.Level, entry.Message)
+ switch entry.Level {
+ case "ERROR":
+ glog.ErrorDepth(3, formattedMsg)
+ case "WARNING":
+ glog.WarningDepth(3, formattedMsg)
+ default: // INFO, DEBUG, etc.
+ glog.InfoDepth(3, formattedMsg)
+ }
+ // Also log to stderr for immediate visibility
+ fmt.Fprintf(os.Stderr, "[TASK-%s] %s: %s\n", l.taskID, entry.Level, entry.Message)
+ }
+}
+
+// saveMetadata saves task metadata to file
+func (l *FileTaskLogger) saveMetadata() error {
+ metadataPath := filepath.Join(l.logDir, "metadata.json")
+
+ data, err := json.MarshalIndent(l.metadata, "", " ")
+ if err != nil {
+ return fmt.Errorf("failed to marshal metadata: %w", err)
+ }
+
+ return os.WriteFile(metadataPath, data, 0644)
+}
+
+// cleanupOldLogs removes old task log directories to maintain the limit
+func (l *FileTaskLogger) cleanupOldLogs() {
+ baseDir := l.config.BaseLogDir
+
+ entries, err := os.ReadDir(baseDir)
+ if err != nil {
+ glog.Warningf("Failed to read log directory %s: %v", baseDir, err)
+ return
+ }
+
+ // Filter for directories only
+ var dirs []os.DirEntry
+ for _, entry := range entries {
+ if entry.IsDir() {
+ dirs = append(dirs, entry)
+ }
+ }
+
+ // If we're under the limit, nothing to clean
+ if len(dirs) <= l.config.MaxTasks {
+ return
+ }
+
+ // Sort by modification time (oldest first)
+ sort.Slice(dirs, func(i, j int) bool {
+ infoI, errI := dirs[i].Info()
+ infoJ, errJ := dirs[j].Info()
+ if errI != nil || errJ != nil {
+ return false
+ }
+ return infoI.ModTime().Before(infoJ.ModTime())
+ })
+
+ // Remove oldest directories
+ numToRemove := len(dirs) - l.config.MaxTasks
+ for i := 0; i < numToRemove; i++ {
+ dirPath := filepath.Join(baseDir, dirs[i].Name())
+ if err := os.RemoveAll(dirPath); err != nil {
+ glog.Warningf("Failed to remove old log directory %s: %v", dirPath, err)
+ } else {
+ glog.V(1).Infof("Cleaned up old task log directory: %s", dirPath)
+ }
+ }
+
+ glog.V(1).Infof("Task log cleanup completed: removed %d old directories", numToRemove)
+}
+
+// GetTaskLogMetadata reads metadata from a task log directory
+func GetTaskLogMetadata(logDir string) (*TaskLogMetadata, error) {
+ metadataPath := filepath.Join(logDir, "metadata.json")
+
+ data, err := os.ReadFile(metadataPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read metadata file: %w", err)
+ }
+
+ var metadata TaskLogMetadata
+ if err := json.Unmarshal(data, &metadata); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
+ }
+
+ return &metadata, nil
+}
+
+// ReadTaskLogs reads all log entries from a task log file
+func ReadTaskLogs(logDir string) ([]TaskLogEntry, error) {
+ logPath := filepath.Join(logDir, "task.log")
+
+ file, err := os.Open(logPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open log file: %w", err)
+ }
+ defer file.Close()
+
+ var entries []TaskLogEntry
+ decoder := json.NewDecoder(file)
+
+ for {
+ var entry TaskLogEntry
+ if err := decoder.Decode(&entry); err != nil {
+ if err == io.EOF {
+ break
+ }
+ return nil, fmt.Errorf("failed to decode log entry: %w", err)
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, nil
+}
diff --git a/weed/worker/tasks/ui_base.go b/weed/worker/tasks/ui_base.go
new file mode 100644
index 000000000..ac22c20c4
--- /dev/null
+++ b/weed/worker/tasks/ui_base.go
@@ -0,0 +1,184 @@
+package tasks
+
+import (
+ "reflect"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// BaseUIProvider provides common UIProvider functionality for all tasks
+type BaseUIProvider struct {
+ taskType types.TaskType
+ displayName string
+ description string
+ icon string
+ schemaFunc func() *TaskConfigSchema
+ configFunc func() types.TaskConfig
+ applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error
+ applyTaskConfigFunc func(config types.TaskConfig) error
+}
+
+// NewBaseUIProvider creates a new base UI provider
+func NewBaseUIProvider(
+ taskType types.TaskType,
+ displayName string,
+ description string,
+ icon string,
+ schemaFunc func() *TaskConfigSchema,
+ configFunc func() types.TaskConfig,
+ applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error,
+ applyTaskConfigFunc func(config types.TaskConfig) error,
+) *BaseUIProvider {
+ return &BaseUIProvider{
+ taskType: taskType,
+ displayName: displayName,
+ description: description,
+ icon: icon,
+ schemaFunc: schemaFunc,
+ configFunc: configFunc,
+ applyTaskPolicyFunc: applyTaskPolicyFunc,
+ applyTaskConfigFunc: applyTaskConfigFunc,
+ }
+}
+
+// GetTaskType returns the task type
+func (ui *BaseUIProvider) GetTaskType() types.TaskType {
+ return ui.taskType
+}
+
+// GetDisplayName returns the human-readable name
+func (ui *BaseUIProvider) GetDisplayName() string {
+ return ui.displayName
+}
+
+// GetDescription returns a description of what this task does
+func (ui *BaseUIProvider) GetDescription() string {
+ return ui.description
+}
+
+// GetIcon returns the icon CSS class for this task type
+func (ui *BaseUIProvider) GetIcon() string {
+ return ui.icon
+}
+
+// GetCurrentConfig returns the current configuration as TaskConfig
+func (ui *BaseUIProvider) GetCurrentConfig() types.TaskConfig {
+ return ui.configFunc()
+}
+
+// ApplyTaskPolicy applies protobuf TaskPolicy configuration
+func (ui *BaseUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return ui.applyTaskPolicyFunc(policy)
+}
+
+// ApplyTaskConfig applies TaskConfig interface configuration
+func (ui *BaseUIProvider) ApplyTaskConfig(config types.TaskConfig) error {
+ return ui.applyTaskConfigFunc(config)
+}
+
+// CommonConfigGetter provides a common pattern for getting current configuration
+type CommonConfigGetter[T any] struct {
+ defaultConfig T
+ detectorFunc func() T
+ schedulerFunc func() T
+}
+
+// NewCommonConfigGetter creates a new common config getter
+func NewCommonConfigGetter[T any](
+ defaultConfig T,
+ detectorFunc func() T,
+ schedulerFunc func() T,
+) *CommonConfigGetter[T] {
+ return &CommonConfigGetter[T]{
+ defaultConfig: defaultConfig,
+ detectorFunc: detectorFunc,
+ schedulerFunc: schedulerFunc,
+ }
+}
+
+// GetConfig returns the merged configuration
+func (cg *CommonConfigGetter[T]) GetConfig() T {
+ config := cg.defaultConfig
+
+ // Apply detector values if available
+ if cg.detectorFunc != nil {
+ detectorConfig := cg.detectorFunc()
+ mergeConfigs(&config, detectorConfig)
+ }
+
+ // Apply scheduler values if available
+ if cg.schedulerFunc != nil {
+ schedulerConfig := cg.schedulerFunc()
+ mergeConfigs(&config, schedulerConfig)
+ }
+
+ return config
+}
+
+// mergeConfigs merges non-zero values from source into dest
+func mergeConfigs[T any](dest *T, source T) {
+ destValue := reflect.ValueOf(dest).Elem()
+ sourceValue := reflect.ValueOf(source)
+
+ if destValue.Kind() != reflect.Struct || sourceValue.Kind() != reflect.Struct {
+ return
+ }
+
+ for i := 0; i < destValue.NumField(); i++ {
+ destField := destValue.Field(i)
+ sourceField := sourceValue.Field(i)
+
+ if !destField.CanSet() {
+ continue
+ }
+
+ // Only copy non-zero values
+ if !sourceField.IsZero() {
+ if destField.Type() == sourceField.Type() {
+ destField.Set(sourceField)
+ }
+ }
+ }
+}
+
+// RegisterUIFunc provides a common registration function signature
+type RegisterUIFunc[D, S any] func(uiRegistry *types.UIRegistry, detector D, scheduler S)
+
+// CommonRegisterUI provides a common registration implementation
+func CommonRegisterUI[D, S any](
+ taskType types.TaskType,
+ displayName string,
+ uiRegistry *types.UIRegistry,
+ detector D,
+ scheduler S,
+ schemaFunc func() *TaskConfigSchema,
+ configFunc func() types.TaskConfig,
+ applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error,
+ applyTaskConfigFunc func(config types.TaskConfig) error,
+) {
+ // Get metadata from schema
+ schema := schemaFunc()
+ description := "Task configuration"
+ icon := "fas fa-cog"
+
+ if schema != nil {
+ description = schema.Description
+ icon = schema.Icon
+ }
+
+ uiProvider := NewBaseUIProvider(
+ taskType,
+ displayName,
+ description,
+ icon,
+ schemaFunc,
+ configFunc,
+ applyTaskPolicyFunc,
+ applyTaskConfigFunc,
+ )
+
+ uiRegistry.RegisterUI(uiProvider)
+ glog.V(1).Infof("βœ… Registered %s task UI provider", taskType)
+}
diff --git a/weed/worker/tasks/vacuum/config.go b/weed/worker/tasks/vacuum/config.go
new file mode 100644
index 000000000..fe8c0e8c5
--- /dev/null
+++ b/weed/worker/tasks/vacuum/config.go
@@ -0,0 +1,190 @@
+package vacuum
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with vacuum-specific settings
+type Config struct {
+ base.BaseConfig
+ GarbageThreshold float64 `json:"garbage_threshold"`
+ MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
+ MinIntervalSeconds int `json:"min_interval_seconds"`
+}
+
+// NewDefaultConfig creates a new default vacuum configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
+ MaxConcurrent: 2,
+ },
+ GarbageThreshold: 0.3, // 30%
+ MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }
+}
+
+// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
+func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: float64(c.GarbageThreshold),
+ MinVolumeAgeHours: int32(c.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
+ MinIntervalSeconds: int32(c.MinIntervalSeconds),
+ },
+ },
+ }
+}
+
+// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
+func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+
+ // Set general TaskPolicy fields
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
+
+ // Set vacuum-specific fields from the task config
+ if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil {
+ c.GarbageThreshold = float64(vacuumConfig.GarbageThreshold)
+ c.MinVolumeAgeSeconds = int(vacuumConfig.MinVolumeAgeHours * 3600) // Convert hours to seconds
+ c.MinIntervalSeconds = int(vacuumConfig.MinIntervalSeconds)
+ }
+
+ return nil
+}
+
+// LoadConfigFromPersistence loads configuration from the persistence layer if available
+func LoadConfigFromPersistence(configPersistence interface{}) *Config {
+ config := NewDefaultConfig()
+
+ // Try to load from persistence if available
+ if persistence, ok := configPersistence.(interface {
+ LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error)
+ }); ok {
+ if policy, err := persistence.LoadVacuumTaskPolicy(); err == nil && policy != nil {
+ if err := config.FromTaskPolicy(policy); err == nil {
+ glog.V(1).Infof("Loaded vacuum configuration from persistence")
+ return config
+ }
+ }
+ }
+
+ glog.V(1).Infof("Using default vacuum configuration")
+ return config
+}
+
+// GetConfigSpec returns the configuration schema for vacuum tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Vacuum Tasks",
+ Description: "Whether vacuum tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic vacuum task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 2 * 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing vacuum",
+ HelpText: "The system will check for volumes that need vacuuming at this interval",
+ Placeholder: "2",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 1,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of vacuum tasks that can run simultaneously",
+ HelpText: "Limits the number of vacuum operations running at the same time to control system load",
+ Placeholder: "2 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "garbage_threshold",
+ JSONName: "garbage_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.3,
+ MinValue: 0.0,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Garbage Percentage Threshold",
+ Description: "Trigger vacuum when garbage ratio exceeds this percentage",
+ HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
+ Placeholder: "0.30 (30%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_volume_age_seconds",
+ JSONName: "min_volume_age_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 24 * 60 * 60,
+ MinValue: 1 * 60 * 60,
+ MaxValue: 7 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Volume Age",
+ Description: "Only vacuum volumes older than this duration",
+ HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
+ Placeholder: "24",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_interval_seconds",
+ JSONName: "min_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 7 * 24 * 60 * 60,
+ MinValue: 1 * 24 * 60 * 60,
+ MaxValue: 30 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Interval",
+ Description: "Minimum time between vacuum operations on the same volume",
+ HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
+ Placeholder: "7",
+ Unit: config.UnitDays,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go
new file mode 100644
index 000000000..7b5a1baf0
--- /dev/null
+++ b/weed/worker/tasks/vacuum/detection.go
@@ -0,0 +1,112 @@
+package vacuum
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for vacuum tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ vacuumConfig := config.(*Config)
+ var results []*types.TaskDetectionResult
+ minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
+
+ debugCount := 0
+ skippedDueToGarbage := 0
+ skippedDueToAge := 0
+
+ for _, metric := range metrics {
+ // Check if volume needs vacuum
+ if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
+ priority := types.TaskPriorityNormal
+ if metric.GarbageRatio > 0.6 {
+ priority = types.TaskPriorityHigh
+ }
+
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeVacuum,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: priority,
+ Reason: "Volume has excessive garbage requiring vacuum",
+ ScheduleAt: time.Now(),
+ }
+ results = append(results, result)
+ } else {
+ // Debug why volume was not selected
+ if debugCount < 5 { // Limit debug output to first 5 volumes
+ if metric.GarbageRatio < vacuumConfig.GarbageThreshold {
+ skippedDueToGarbage++
+ }
+ if metric.Age < minVolumeAge {
+ skippedDueToAge++
+ }
+ }
+ debugCount++
+ }
+ }
+
+ // Log debug summary if no tasks were created
+ if len(results) == 0 && len(metrics) > 0 {
+ totalVolumes := len(metrics)
+ glog.Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage<threshold), %d (age<minimum)",
+ totalVolumes, vacuumConfig.GarbageThreshold*100, minVolumeAge, skippedDueToGarbage, skippedDueToAge)
+
+ // Show details for first few volumes
+ for i, metric := range metrics {
+ if i >= 3 { // Limit to first 3 volumes
+ break
+ }
+ glog.Infof("VACUUM: Volume %d: garbage=%.2f%% (need β‰₯%.2f%%), age=%s (need β‰₯%s)",
+ metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100,
+ metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute))
+ }
+ }
+
+ return results, nil
+}
+
+// Scheduling implements the scheduling logic for vacuum tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ vacuumConfig := config.(*Config)
+
+ // Count running vacuum tasks
+ runningVacuumCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeVacuum {
+ runningVacuumCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningVacuumCount >= vacuumConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check for available workers with vacuum capability
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeVacuum {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// CreateTask creates a new vacuum task instance
+func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Create and return the vacuum task using existing Task type
+ return NewTask(params.Server, params.VolumeID), nil
+}
diff --git a/weed/worker/tasks/vacuum/ui.go b/weed/worker/tasks/vacuum/ui.go
deleted file mode 100644
index 6f67a801a..000000000
--- a/weed/worker/tasks/vacuum/ui.go
+++ /dev/null
@@ -1,314 +0,0 @@
-package vacuum
-
-import (
- "fmt"
- "html/template"
- "strconv"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// UIProvider provides the UI for vacuum task configuration
-type UIProvider struct {
- detector *VacuumDetector
- scheduler *VacuumScheduler
-}
-
-// NewUIProvider creates a new vacuum UI provider
-func NewUIProvider(detector *VacuumDetector, scheduler *VacuumScheduler) *UIProvider {
- return &UIProvider{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetTaskType returns the task type
-func (ui *UIProvider) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// GetDisplayName returns the human-readable name
-func (ui *UIProvider) GetDisplayName() string {
- return "Volume Vacuum"
-}
-
-// GetDescription returns a description of what this task does
-func (ui *UIProvider) GetDescription() string {
- return "Reclaims disk space by removing deleted files from volumes"
-}
-
-// GetIcon returns the icon CSS class for this task type
-func (ui *UIProvider) GetIcon() string {
- return "fas fa-broom text-primary"
-}
-
-// VacuumConfig represents the vacuum configuration
-type VacuumConfig struct {
- Enabled bool `json:"enabled"`
- GarbageThreshold float64 `json:"garbage_threshold"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
- MinIntervalSeconds int `json:"min_interval_seconds"`
-}
-
-// Helper functions for duration conversion
-func secondsToDuration(seconds int) time.Duration {
- return time.Duration(seconds) * time.Second
-}
-
-func durationToSeconds(d time.Duration) int {
- return int(d.Seconds())
-}
-
-// formatDurationForUser formats seconds as a user-friendly duration string
-func formatDurationForUser(seconds int) string {
- d := secondsToDuration(seconds)
- if d < time.Minute {
- return fmt.Sprintf("%ds", seconds)
- }
- if d < time.Hour {
- return fmt.Sprintf("%.0fm", d.Minutes())
- }
- if d < 24*time.Hour {
- return fmt.Sprintf("%.1fh", d.Hours())
- }
- return fmt.Sprintf("%.1fd", d.Hours()/24)
-}
-
-// RenderConfigForm renders the configuration form HTML
-func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
- config := ui.getCurrentVacuumConfig()
-
- // Build form using the FormBuilder helper
- form := types.NewFormBuilder()
-
- // Detection Settings
- form.AddCheckboxField(
- "enabled",
- "Enable Vacuum Tasks",
- "Whether vacuum tasks should be automatically created",
- config.Enabled,
- )
-
- form.AddNumberField(
- "garbage_threshold",
- "Garbage Threshold (%)",
- "Trigger vacuum when garbage ratio exceeds this percentage (0.0-1.0)",
- config.GarbageThreshold,
- true,
- )
-
- form.AddDurationField(
- "scan_interval",
- "Scan Interval",
- "How often to scan for volumes needing vacuum",
- secondsToDuration(config.ScanIntervalSeconds),
- true,
- )
-
- form.AddDurationField(
- "min_volume_age",
- "Minimum Volume Age",
- "Only vacuum volumes older than this duration",
- secondsToDuration(config.MinVolumeAgeSeconds),
- true,
- )
-
- // Scheduling Settings
- form.AddNumberField(
- "max_concurrent",
- "Max Concurrent Tasks",
- "Maximum number of vacuum tasks that can run simultaneously",
- float64(config.MaxConcurrent),
- true,
- )
-
- form.AddDurationField(
- "min_interval",
- "Minimum Interval",
- "Minimum time between vacuum operations on the same volume",
- secondsToDuration(config.MinIntervalSeconds),
- true,
- )
-
- // Generate organized form sections using Bootstrap components
- html := `
-<div class="row">
- <div class="col-12">
- <div class="card mb-4">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-search me-2"></i>
- Detection Settings
- </h5>
- </div>
- <div class="card-body">
-` + string(form.Build()) + `
- </div>
- </div>
- </div>
-</div>
-
-<script>
-function resetForm() {
- if (confirm('Reset all vacuum settings to defaults?')) {
- // Reset to default values
- document.querySelector('input[name="enabled"]').checked = true;
- document.querySelector('input[name="garbage_threshold"]').value = '0.3';
- document.querySelector('input[name="scan_interval"]').value = '30m';
- document.querySelector('input[name="min_volume_age"]').value = '1h';
- document.querySelector('input[name="max_concurrent"]').value = '2';
- document.querySelector('input[name="min_interval"]').value = '6h';
- }
-}
-</script>
-`
-
- return template.HTML(html), nil
-}
-
-// ParseConfigForm parses form data into configuration
-func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
- config := &VacuumConfig{}
-
- // Parse enabled checkbox
- config.Enabled = len(formData["enabled"]) > 0 && formData["enabled"][0] == "on"
-
- // Parse garbage threshold
- if thresholdStr := formData["garbage_threshold"]; len(thresholdStr) > 0 {
- if threshold, err := strconv.ParseFloat(thresholdStr[0], 64); err != nil {
- return nil, fmt.Errorf("invalid garbage threshold: %w", err)
- } else if threshold < 0 || threshold > 1 {
- return nil, fmt.Errorf("garbage threshold must be between 0.0 and 1.0")
- } else {
- config.GarbageThreshold = threshold
- }
- }
-
- // Parse scan interval
- if intervalStr := formData["scan_interval"]; len(intervalStr) > 0 {
- if interval, err := time.ParseDuration(intervalStr[0]); err != nil {
- return nil, fmt.Errorf("invalid scan interval: %w", err)
- } else {
- config.ScanIntervalSeconds = durationToSeconds(interval)
- }
- }
-
- // Parse min volume age
- if ageStr := formData["min_volume_age"]; len(ageStr) > 0 {
- if age, err := time.ParseDuration(ageStr[0]); err != nil {
- return nil, fmt.Errorf("invalid min volume age: %w", err)
- } else {
- config.MinVolumeAgeSeconds = durationToSeconds(age)
- }
- }
-
- // Parse max concurrent
- if concurrentStr := formData["max_concurrent"]; len(concurrentStr) > 0 {
- if concurrent, err := strconv.Atoi(concurrentStr[0]); err != nil {
- return nil, fmt.Errorf("invalid max concurrent: %w", err)
- } else if concurrent < 1 {
- return nil, fmt.Errorf("max concurrent must be at least 1")
- } else {
- config.MaxConcurrent = concurrent
- }
- }
-
- // Parse min interval
- if intervalStr := formData["min_interval"]; len(intervalStr) > 0 {
- if interval, err := time.ParseDuration(intervalStr[0]); err != nil {
- return nil, fmt.Errorf("invalid min interval: %w", err)
- } else {
- config.MinIntervalSeconds = durationToSeconds(interval)
- }
- }
-
- return config, nil
-}
-
-// GetCurrentConfig returns the current configuration
-func (ui *UIProvider) GetCurrentConfig() interface{} {
- return ui.getCurrentVacuumConfig()
-}
-
-// ApplyConfig applies the new configuration
-func (ui *UIProvider) ApplyConfig(config interface{}) error {
- vacuumConfig, ok := config.(*VacuumConfig)
- if !ok {
- return fmt.Errorf("invalid config type, expected *VacuumConfig")
- }
-
- // Apply to detector
- if ui.detector != nil {
- ui.detector.SetEnabled(vacuumConfig.Enabled)
- ui.detector.SetGarbageThreshold(vacuumConfig.GarbageThreshold)
- ui.detector.SetScanInterval(secondsToDuration(vacuumConfig.ScanIntervalSeconds))
- ui.detector.SetMinVolumeAge(secondsToDuration(vacuumConfig.MinVolumeAgeSeconds))
- }
-
- // Apply to scheduler
- if ui.scheduler != nil {
- ui.scheduler.SetEnabled(vacuumConfig.Enabled)
- ui.scheduler.SetMaxConcurrent(vacuumConfig.MaxConcurrent)
- ui.scheduler.SetMinInterval(secondsToDuration(vacuumConfig.MinIntervalSeconds))
- }
-
- glog.V(1).Infof("Applied vacuum configuration: enabled=%v, threshold=%.1f%%, scan_interval=%s, max_concurrent=%d",
- vacuumConfig.Enabled, vacuumConfig.GarbageThreshold*100, formatDurationForUser(vacuumConfig.ScanIntervalSeconds), vacuumConfig.MaxConcurrent)
-
- return nil
-}
-
-// getCurrentVacuumConfig gets the current configuration from detector and scheduler
-func (ui *UIProvider) getCurrentVacuumConfig() *VacuumConfig {
- config := &VacuumConfig{
- // Default values (fallback if detectors/schedulers are nil)
- Enabled: true,
- GarbageThreshold: 0.3,
- ScanIntervalSeconds: 30 * 60,
- MinVolumeAgeSeconds: 1 * 60 * 60,
- MaxConcurrent: 2,
- MinIntervalSeconds: 6 * 60 * 60,
- }
-
- // Get current values from detector
- if ui.detector != nil {
- config.Enabled = ui.detector.IsEnabled()
- config.GarbageThreshold = ui.detector.GetGarbageThreshold()
- config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval())
- config.MinVolumeAgeSeconds = durationToSeconds(ui.detector.GetMinVolumeAge())
- }
-
- // Get current values from scheduler
- if ui.scheduler != nil {
- config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
- config.MinIntervalSeconds = durationToSeconds(ui.scheduler.GetMinInterval())
- }
-
- return config
-}
-
-// RegisterUI registers the vacuum UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *VacuumDetector, scheduler *VacuumScheduler) {
- uiProvider := NewUIProvider(detector, scheduler)
- uiRegistry.RegisterUI(uiProvider)
-
- glog.V(1).Infof("βœ… Registered vacuum task UI provider")
-}
-
-// Example: How to get the UI provider for external use
-func GetUIProvider(uiRegistry *types.UIRegistry) *UIProvider {
- provider := uiRegistry.GetProvider(types.TaskTypeVacuum)
- if provider == nil {
- return nil
- }
-
- if vacuumProvider, ok := provider.(*UIProvider); ok {
- return vacuumProvider
- }
-
- return nil
-}
diff --git a/weed/worker/tasks/vacuum/vacuum.go b/weed/worker/tasks/vacuum/vacuum.go
index dbfe35cf8..9cd254958 100644
--- a/weed/worker/tasks/vacuum/vacuum.go
+++ b/weed/worker/tasks/vacuum/vacuum.go
@@ -1,60 +1,184 @@
package vacuum
import (
+ "context"
"fmt"
+ "io"
"time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
)
// Task implements vacuum operation to reclaim disk space
type Task struct {
*tasks.BaseTask
- server string
- volumeID uint32
+ server string
+ volumeID uint32
+ garbageThreshold float64
}
// NewTask creates a new vacuum task instance
func NewTask(server string, volumeID uint32) *Task {
task := &Task{
- BaseTask: tasks.NewBaseTask(types.TaskTypeVacuum),
- server: server,
- volumeID: volumeID,
+ BaseTask: tasks.NewBaseTask(types.TaskTypeVacuum),
+ server: server,
+ volumeID: volumeID,
+ garbageThreshold: 0.3, // Default 30% threshold
}
return task
}
-// Execute executes the vacuum task
+// Execute performs the vacuum operation
func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting vacuum task for volume %d on server %s", t.volumeID, t.server)
-
- // Simulate vacuum operation with progress updates
- steps := []struct {
- name string
- duration time.Duration
- progress float64
- }{
- {"Scanning volume", 1 * time.Second, 20},
- {"Identifying deleted files", 2 * time.Second, 50},
- {"Compacting data", 3 * time.Second, 80},
- {"Finalizing vacuum", 1 * time.Second, 100},
+ // Use BaseTask.ExecuteTask to handle logging initialization
+ return t.ExecuteTask(context.Background(), params, t.executeImpl)
+}
+
+// executeImpl is the actual vacuum implementation
+func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error {
+ t.LogInfo("Starting vacuum for volume %d on server %s", t.volumeID, t.server)
+
+ // Parse garbage threshold from typed parameters
+ if params.TypedParams != nil {
+ if vacuumParams := params.TypedParams.GetVacuumParams(); vacuumParams != nil {
+ t.garbageThreshold = vacuumParams.GarbageThreshold
+ t.LogWithFields("INFO", "Using garbage threshold from parameters", map[string]interface{}{
+ "threshold": t.garbageThreshold,
+ })
+ }
+ }
+
+ // Convert server address to gRPC address and use proper dial option
+ grpcAddress := pb.ServerToGrpcAddress(t.server)
+ var dialOpt grpc.DialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
+ if params.GrpcDialOption != nil {
+ dialOpt = params.GrpcDialOption
+ }
+
+ conn, err := grpc.NewClient(grpcAddress, dialOpt)
+ if err != nil {
+ t.LogError("Failed to connect to volume server %s: %v", t.server, err)
+ return fmt.Errorf("failed to connect to volume server %s: %v", t.server, err)
+ }
+ defer conn.Close()
+
+ client := volume_server_pb.NewVolumeServerClient(conn)
+
+ // Step 1: Check vacuum eligibility
+ t.SetProgress(10.0)
+ t.LogDebug("Checking vacuum eligibility for volume %d", t.volumeID)
+
+ checkResp, err := client.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ t.LogError("Vacuum check failed for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum check failed for volume %d: %v", t.volumeID, err)
+ }
+
+ // Check if garbage ratio meets threshold
+ if checkResp.GarbageRatio < t.garbageThreshold {
+ t.LogWarning("Volume %d garbage ratio %.2f%% is below threshold %.2f%%, skipping vacuum",
+ t.volumeID, checkResp.GarbageRatio*100, t.garbageThreshold*100)
+ return fmt.Errorf("volume %d garbage ratio %.2f%% is below threshold %.2f%%, skipping vacuum",
+ t.volumeID, checkResp.GarbageRatio*100, t.garbageThreshold*100)
+ }
+
+ t.LogWithFields("INFO", "Volume eligible for vacuum", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "garbage_ratio": checkResp.GarbageRatio,
+ "threshold": t.garbageThreshold,
+ "garbage_percent": checkResp.GarbageRatio * 100,
+ })
+
+ // Step 2: Compact volume
+ t.SetProgress(30.0)
+ t.LogInfo("Starting compact for volume %d", t.volumeID)
+
+ compactStream, err := client.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ t.LogError("Vacuum compact failed for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum compact failed for volume %d: %v", t.volumeID, err)
}
- for _, step := range steps {
- if t.IsCancelled() {
- return fmt.Errorf("vacuum task cancelled")
+ // Process compact stream and track progress
+ var processedBytes int64
+ var totalBytes int64
+
+ for {
+ resp, err := compactStream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ t.LogError("Vacuum compact stream error for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum compact stream error for volume %d: %v", t.volumeID, err)
}
- glog.V(1).Infof("Vacuum task step: %s", step.name)
- t.SetProgress(step.progress)
+ processedBytes = resp.ProcessedBytes
+ if resp.LoadAvg_1M > 0 {
+ totalBytes = int64(resp.LoadAvg_1M) // This is a rough approximation
+ }
+
+ // Update progress based on processed bytes (30% to 70% of total progress)
+ if totalBytes > 0 {
+ compactProgress := float64(processedBytes) / float64(totalBytes)
+ if compactProgress > 1.0 {
+ compactProgress = 1.0
+ }
+ progress := 30.0 + (compactProgress * 40.0) // 30% to 70%
+ t.SetProgress(progress)
+ }
- // Simulate work
- time.Sleep(step.duration)
+ t.LogWithFields("DEBUG", "Volume compact progress", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "processed_bytes": processedBytes,
+ "total_bytes": totalBytes,
+ "compact_progress": fmt.Sprintf("%.1f%%", (float64(processedBytes)/float64(totalBytes))*100),
+ })
}
- glog.Infof("Vacuum task completed for volume %d on server %s", t.volumeID, t.server)
+ // Step 3: Commit vacuum changes
+ t.SetProgress(80.0)
+ t.LogInfo("Committing vacuum for volume %d", t.volumeID)
+
+ commitResp, err := client.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ t.LogError("Vacuum commit failed for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum commit failed for volume %d: %v", t.volumeID, err)
+ }
+
+ // Step 4: Cleanup temporary files
+ t.SetProgress(90.0)
+ t.LogInfo("Cleaning up vacuum files for volume %d", t.volumeID)
+
+ _, err = client.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ // Log warning but don't fail the task
+ t.LogWarning("Vacuum cleanup warning for volume %d: %v", t.volumeID, err)
+ }
+
+ t.SetProgress(100.0)
+
+ newVolumeSize := commitResp.VolumeSize
+ t.LogWithFields("INFO", "Successfully completed vacuum", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "new_volume_size": newVolumeSize,
+ "garbage_reclaimed": true,
+ })
+
return nil
}
@@ -71,9 +195,20 @@ func (t *Task) Validate(params types.TaskParams) error {
// EstimateTime estimates the time needed for the task
func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
- // Base time for vacuum operation
- baseTime := 25 * time.Second
+ // Base time for vacuum operations - varies by volume size and garbage ratio
+ // Typically vacuum is faster than EC encoding
+ baseTime := 5 * time.Minute
- // Could adjust based on volume size or usage patterns
+ // Use default estimation since volume size is not available in typed params
return baseTime
}
+
+// GetProgress returns the current progress
+func (t *Task) GetProgress() float64 {
+ return t.BaseTask.GetProgress()
+}
+
+// Cancel cancels the task
+func (t *Task) Cancel() error {
+ return t.BaseTask.Cancel()
+}
diff --git a/weed/worker/tasks/vacuum/vacuum_detector.go b/weed/worker/tasks/vacuum/vacuum_detector.go
deleted file mode 100644
index 6d7230c6c..000000000
--- a/weed/worker/tasks/vacuum/vacuum_detector.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package vacuum
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// VacuumDetector implements vacuum task detection using code instead of schemas
-type VacuumDetector struct {
- enabled bool
- garbageThreshold float64
- minVolumeAge time.Duration
- scanInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*VacuumDetector)(nil)
- _ types.PolicyConfigurableDetector = (*VacuumDetector)(nil)
-)
-
-// NewVacuumDetector creates a new simple vacuum detector
-func NewVacuumDetector() *VacuumDetector {
- return &VacuumDetector{
- enabled: true,
- garbageThreshold: 0.3,
- minVolumeAge: 24 * time.Hour,
- scanInterval: 30 * time.Minute,
- }
-}
-
-// GetTaskType returns the task type
-func (d *VacuumDetector) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// ScanForTasks scans for volumes that need vacuum operations
-func (d *VacuumDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- var results []*types.TaskDetectionResult
-
- for _, metric := range volumeMetrics {
- // Check if volume needs vacuum
- if metric.GarbageRatio >= d.garbageThreshold && metric.Age >= d.minVolumeAge {
- // Higher priority for volumes with more garbage
- priority := types.TaskPriorityNormal
- if metric.GarbageRatio > 0.6 {
- priority = types.TaskPriorityHigh
- }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeVacuum,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: priority,
- Reason: "Volume has excessive garbage requiring vacuum",
- Parameters: map[string]interface{}{
- "garbage_ratio": metric.GarbageRatio,
- "volume_age": metric.Age.String(),
- },
- ScheduleAt: time.Now(),
- }
- results = append(results, result)
- }
- }
-
- glog.V(2).Infof("Vacuum detector found %d volumes needing vacuum", len(results))
- return results, nil
-}
-
-// ScanInterval returns how often this detector should scan
-func (d *VacuumDetector) ScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// IsEnabled returns whether this detector is enabled
-func (d *VacuumDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// Configuration setters
-
-func (d *VacuumDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
-}
-
-func (d *VacuumDetector) SetGarbageThreshold(threshold float64) {
- d.garbageThreshold = threshold
-}
-
-func (d *VacuumDetector) SetScanInterval(interval time.Duration) {
- d.scanInterval = interval
-}
-
-func (d *VacuumDetector) SetMinVolumeAge(age time.Duration) {
- d.minVolumeAge = age
-}
-
-// GetGarbageThreshold returns the current garbage threshold
-func (d *VacuumDetector) GetGarbageThreshold() float64 {
- return d.garbageThreshold
-}
-
-// GetMinVolumeAge returns the minimum volume age
-func (d *VacuumDetector) GetMinVolumeAge() time.Duration {
- return d.minVolumeAge
-}
-
-// GetScanInterval returns the scan interval
-func (d *VacuumDetector) GetScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// ConfigureFromPolicy configures the detector based on the maintenance policy
-func (d *VacuumDetector) ConfigureFromPolicy(policy interface{}) {
- // Type assert to the maintenance policy type we expect
- if maintenancePolicy, ok := policy.(interface {
- GetVacuumEnabled() bool
- GetVacuumGarbageRatio() float64
- }); ok {
- d.SetEnabled(maintenancePolicy.GetVacuumEnabled())
- d.SetGarbageThreshold(maintenancePolicy.GetVacuumGarbageRatio())
- } else {
- glog.V(1).Infof("Could not configure vacuum detector from policy: unsupported policy type")
- }
-}
diff --git a/weed/worker/tasks/vacuum/vacuum_register.go b/weed/worker/tasks/vacuum/vacuum_register.go
index 7d930a88e..d660c9d42 100644
--- a/weed/worker/tasks/vacuum/vacuum_register.go
+++ b/weed/worker/tasks/vacuum/vacuum_register.go
@@ -2,80 +2,71 @@ package vacuum
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates vacuum task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new vacuum task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeVacuum,
- []string{"vacuum", "storage"},
- "Vacuum operation to reclaim disk space by removing deleted files",
- ),
- }
-}
-
-// Create creates a new vacuum task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterVacuumTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeVacuum, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *VacuumDetector
- sharedScheduler *VacuumScheduler
-)
+// RegisterVacuumTask registers the vacuum task with the new architecture
+func RegisterVacuumTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*VacuumDetector, *VacuumScheduler) {
- if sharedDetector == nil {
- sharedDetector = NewVacuumDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewVacuumScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeVacuum,
+ Name: "vacuum",
+ DisplayName: "Volume Vacuum",
+ Description: "Reclaims disk space by removing deleted files from volumes",
+ Icon: "fas fa-broom text-primary",
+ Capabilities: []string{"vacuum", "storage"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 2 * time.Hour,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 2,
+ RepeatInterval: 7 * 24 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*VacuumDetector, *VacuumScheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeVacuum, factory)
+// UpdateConfigFromPersistence updates the vacuum configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("vacuum task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated vacuum task configuration from persistence")
+ return nil
}
diff --git a/weed/worker/tasks/vacuum/vacuum_scheduler.go b/weed/worker/tasks/vacuum/vacuum_scheduler.go
deleted file mode 100644
index 2b67a9f40..000000000
--- a/weed/worker/tasks/vacuum/vacuum_scheduler.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package vacuum
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// VacuumScheduler implements vacuum task scheduling using code instead of schemas
-type VacuumScheduler struct {
- enabled bool
- maxConcurrent int
- minInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskScheduler = (*VacuumScheduler)(nil)
-)
-
-// NewVacuumScheduler creates a new simple vacuum scheduler
-func NewVacuumScheduler() *VacuumScheduler {
- return &VacuumScheduler{
- enabled: true,
- maxConcurrent: 2,
- minInterval: 6 * time.Hour,
- }
-}
-
-// GetTaskType returns the task type
-func (s *VacuumScheduler) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// CanScheduleNow determines if a vacuum task can be scheduled right now
-func (s *VacuumScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- // Check if scheduler is enabled
- if !s.enabled {
- return false
- }
-
- // Check concurrent limit
- runningVacuumCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeVacuum {
- runningVacuumCount++
- }
- }
-
- if runningVacuumCount >= s.maxConcurrent {
- return false
- }
-
- // Check if there's an available worker with vacuum capability
- for _, worker := range availableWorkers {
- if worker.CurrentLoad < worker.MaxConcurrent {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeVacuum {
- return true
- }
- }
- }
- }
-
- return false
-}
-
-// GetPriority returns the priority for this task
-func (s *VacuumScheduler) GetPriority(task *types.Task) types.TaskPriority {
- // Could adjust priority based on task parameters
- if params, ok := task.Parameters["garbage_ratio"].(float64); ok {
- if params > 0.8 {
- return types.TaskPriorityHigh
- }
- }
- return task.Priority
-}
-
-// GetMaxConcurrent returns max concurrent tasks of this type
-func (s *VacuumScheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating vacuum tasks
-func (s *VacuumScheduler) GetDefaultRepeatInterval() time.Duration {
- return s.minInterval
-}
-
-// IsEnabled returns whether this scheduler is enabled
-func (s *VacuumScheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// Configuration setters
-
-func (s *VacuumScheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
-}
-
-func (s *VacuumScheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
-}
-
-func (s *VacuumScheduler) SetMinInterval(interval time.Duration) {
- s.minInterval = interval
-}
-
-// GetMinInterval returns the minimum interval
-func (s *VacuumScheduler) GetMinInterval() time.Duration {
- return s.minInterval
-}