diff options
Diffstat (limited to 'weed/worker/tasks')
33 files changed, 4503 insertions, 2107 deletions
diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go index ea867d950..0becb3415 100644 --- a/weed/worker/tasks/balance/balance.go +++ b/weed/worker/tasks/balance/balance.go @@ -1,6 +1,7 @@ package balance import ( + "context" "fmt" "time" @@ -15,6 +16,9 @@ type Task struct { server string volumeID uint32 collection string + + // Task parameters for accessing planned destinations + taskParams types.TaskParams } // NewTask creates a new balance task instance @@ -30,7 +34,31 @@ func NewTask(server string, volumeID uint32, collection string) *Task { // Execute executes the balance task func (t *Task) Execute(params types.TaskParams) error { - glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection) + // Use BaseTask.ExecuteTask to handle logging initialization + return t.ExecuteTask(context.Background(), params, t.executeImpl) +} + +// executeImpl is the actual balance implementation +func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error { + // Store task parameters for accessing planned destinations + t.taskParams = params + + // Get planned destination + destNode := t.getPlannedDestination() + if destNode != "" { + t.LogWithFields("INFO", "Starting balance task with planned destination", map[string]interface{}{ + "volume_id": t.volumeID, + "source": t.server, + "destination": destNode, + "collection": t.collection, + }) + } else { + t.LogWithFields("INFO", "Starting balance task without specific destination", map[string]interface{}{ + "volume_id": t.volumeID, + "server": t.server, + "collection": t.collection, + }) + } // Simulate balance operation with progress updates steps := []struct { @@ -46,18 +74,36 @@ func (t *Task) Execute(params types.TaskParams) error { } for _, step := range steps { + select { + case <-ctx.Done(): + t.LogWarning("Balance task cancelled during step: %s", step.name) + return ctx.Err() + default: + } + if t.IsCancelled() { + t.LogWarning("Balance task cancelled by request during step: %s", step.name) return fmt.Errorf("balance task cancelled") } - glog.V(1).Infof("Balance task step: %s", step.name) + t.LogWithFields("INFO", "Executing balance step", map[string]interface{}{ + "step": step.name, + "progress": step.progress, + "duration": step.duration.String(), + "volume_id": t.volumeID, + }) t.SetProgress(step.progress) // Simulate work time.Sleep(step.duration) } - glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server) + t.LogWithFields("INFO", "Balance task completed successfully", map[string]interface{}{ + "volume_id": t.volumeID, + "server": t.server, + "collection": t.collection, + "final_progress": 100.0, + }) return nil } @@ -72,6 +118,19 @@ func (t *Task) Validate(params types.TaskParams) error { return nil } +// getPlannedDestination extracts the planned destination node from task parameters +func (t *Task) getPlannedDestination() string { + if t.taskParams.TypedParams != nil { + if balanceParams := t.taskParams.TypedParams.GetBalanceParams(); balanceParams != nil { + if balanceParams.DestNode != "" { + glog.V(2).Infof("Found planned destination for volume %d: %s", t.volumeID, balanceParams.DestNode) + return balanceParams.DestNode + } + } + } + return "" +} + // EstimateTime estimates the time needed for the task func (t *Task) EstimateTime(params types.TaskParams) time.Duration { // Base time for balance operation diff --git a/weed/worker/tasks/balance/balance_detector.go b/weed/worker/tasks/balance/balance_detector.go deleted file mode 100644 index f082b7a77..000000000 --- a/weed/worker/tasks/balance/balance_detector.go +++ /dev/null @@ -1,171 +0,0 @@ -package balance - -import ( - "fmt" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// BalanceDetector implements TaskDetector for balance tasks -type BalanceDetector struct { - enabled bool - threshold float64 // Imbalance threshold (0.1 = 10%) - minCheckInterval time.Duration - minVolumeCount int - lastCheck time.Time -} - -// Compile-time interface assertions -var ( - _ types.TaskDetector = (*BalanceDetector)(nil) -) - -// NewBalanceDetector creates a new balance detector -func NewBalanceDetector() *BalanceDetector { - return &BalanceDetector{ - enabled: true, - threshold: 0.1, // 10% imbalance threshold - minCheckInterval: 1 * time.Hour, - minVolumeCount: 10, // Don't balance small clusters - lastCheck: time.Time{}, - } -} - -// GetTaskType returns the task type -func (d *BalanceDetector) GetTaskType() types.TaskType { - return types.TaskTypeBalance -} - -// ScanForTasks checks if cluster balance is needed -func (d *BalanceDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { - if !d.enabled { - return nil, nil - } - - glog.V(2).Infof("Scanning for balance tasks...") - - // Don't check too frequently - if time.Since(d.lastCheck) < d.minCheckInterval { - return nil, nil - } - d.lastCheck = time.Now() - - // Skip if cluster is too small - if len(volumeMetrics) < d.minVolumeCount { - glog.V(2).Infof("Cluster too small for balance (%d volumes < %d minimum)", len(volumeMetrics), d.minVolumeCount) - return nil, nil - } - - // Analyze volume distribution across servers - serverVolumeCounts := make(map[string]int) - for _, metric := range volumeMetrics { - serverVolumeCounts[metric.Server]++ - } - - if len(serverVolumeCounts) < 2 { - glog.V(2).Infof("Not enough servers for balance (%d servers)", len(serverVolumeCounts)) - return nil, nil - } - - // Calculate balance metrics - totalVolumes := len(volumeMetrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) - - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" - - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server - } - if count < minVolumes { - minVolumes = count - minServer = server - } - } - - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= d.threshold { - glog.V(2).Infof("Cluster is balanced (imbalance ratio: %.2f <= %.2f)", imbalanceRatio, d.threshold) - return nil, nil - } - - // Create balance task - reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - - task := &types.TaskDetectionResult{ - TaskType: types.TaskTypeBalance, - Priority: types.TaskPriorityNormal, - Reason: reason, - ScheduleAt: time.Now(), - Parameters: map[string]interface{}{ - "imbalance_ratio": imbalanceRatio, - "threshold": d.threshold, - "max_volumes": maxVolumes, - "min_volumes": minVolumes, - "avg_volumes_per_server": avgVolumesPerServer, - "max_server": maxServer, - "min_server": minServer, - "total_servers": len(serverVolumeCounts), - }, - } - - glog.V(1).Infof("π Found balance task: %s", reason) - return []*types.TaskDetectionResult{task}, nil -} - -// ScanInterval returns how often to scan -func (d *BalanceDetector) ScanInterval() time.Duration { - return d.minCheckInterval -} - -// IsEnabled returns whether the detector is enabled -func (d *BalanceDetector) IsEnabled() bool { - return d.enabled -} - -// SetEnabled sets whether the detector is enabled -func (d *BalanceDetector) SetEnabled(enabled bool) { - d.enabled = enabled - glog.V(1).Infof("π Balance detector enabled: %v", enabled) -} - -// SetThreshold sets the imbalance threshold -func (d *BalanceDetector) SetThreshold(threshold float64) { - d.threshold = threshold - glog.V(1).Infof("π Balance threshold set to: %.1f%%", threshold*100) -} - -// SetMinCheckInterval sets the minimum time between balance checks -func (d *BalanceDetector) SetMinCheckInterval(interval time.Duration) { - d.minCheckInterval = interval - glog.V(1).Infof("π Balance check interval set to: %v", interval) -} - -// SetMinVolumeCount sets the minimum volume count for balance operations -func (d *BalanceDetector) SetMinVolumeCount(count int) { - d.minVolumeCount = count - glog.V(1).Infof("π Balance minimum volume count set to: %d", count) -} - -// GetThreshold returns the current imbalance threshold -func (d *BalanceDetector) GetThreshold() float64 { - return d.threshold -} - -// GetMinCheckInterval returns the minimum check interval -func (d *BalanceDetector) GetMinCheckInterval() time.Duration { - return d.minCheckInterval -} - -// GetMinVolumeCount returns the minimum volume count -func (d *BalanceDetector) GetMinVolumeCount() int { - return d.minVolumeCount -} diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go index 7c2d5a520..b26a40782 100644 --- a/weed/worker/tasks/balance/balance_register.go +++ b/weed/worker/tasks/balance/balance_register.go @@ -2,80 +2,71 @@ package balance import ( "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Factory creates balance task instances -type Factory struct { - *tasks.BaseTaskFactory -} +// Global variable to hold the task definition for configuration updates +var globalTaskDef *base.TaskDefinition -// NewFactory creates a new balance task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeBalance, - []string{"balance", "storage", "optimization"}, - "Balance data across volume servers for optimal performance", - ), - } -} - -// Create creates a new balance task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID, params.Collection) - task.SetEstimatedDuration(task.EstimateTime(params)) +// Auto-register this task when the package is imported +func init() { + RegisterBalanceTask() - return task, nil + // Register config updater + tasks.AutoRegisterConfigUpdater(types.TaskTypeBalance, UpdateConfigFromPersistence) } -// Shared detector and scheduler instances -var ( - sharedDetector *BalanceDetector - sharedScheduler *BalanceScheduler -) +// RegisterBalanceTask registers the balance task with the new architecture +func RegisterBalanceTask() { + // Create configuration instance + config := NewDefaultConfig() -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*BalanceDetector, *BalanceScheduler) { - if sharedDetector == nil { - sharedDetector = NewBalanceDetector() - } - if sharedScheduler == nil { - sharedScheduler = NewBalanceScheduler() + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeBalance, + Name: "balance", + DisplayName: "Volume Balance", + Description: "Balances volume distribution across servers", + Icon: "fas fa-balance-scale text-warning", + Capabilities: []string{"balance", "distribution"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: CreateTask, + DetectionFunc: Detection, + ScanInterval: 30 * time.Minute, + SchedulingFunc: Scheduling, + MaxConcurrent: 1, + RepeatInterval: 2 * time.Hour, } - return sharedDetector, sharedScheduler -} -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) { - return getSharedInstances() + // Store task definition globally for configuration updates + globalTaskDef = taskDef + + // Register everything with a single function call! + base.RegisterTask(taskDef) } -// Auto-register this task when the package is imported -func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeBalance, factory) +// UpdateConfigFromPersistence updates the balance configuration from persistence +func UpdateConfigFromPersistence(configPersistence interface{}) error { + if globalTaskDef == nil { + return fmt.Errorf("balance task not registered") + } - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() + // Load configuration from persistence + newConfig := LoadConfigFromPersistence(configPersistence) + if newConfig == nil { + return fmt.Errorf("failed to load configuration from persistence") + } - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) + // Update the task definition's config + globalTaskDef.Config = newConfig - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + glog.V(1).Infof("Updated balance task configuration from persistence") + return nil } diff --git a/weed/worker/tasks/balance/balance_scheduler.go b/weed/worker/tasks/balance/balance_scheduler.go deleted file mode 100644 index a8fefe465..000000000 --- a/weed/worker/tasks/balance/balance_scheduler.go +++ /dev/null @@ -1,197 +0,0 @@ -package balance - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// BalanceScheduler implements TaskScheduler for balance tasks -type BalanceScheduler struct { - enabled bool - maxConcurrent int - minInterval time.Duration - lastScheduled map[string]time.Time // track when we last scheduled a balance for each task type - minServerCount int - moveDuringOffHours bool - offHoursStart string - offHoursEnd string -} - -// Compile-time interface assertions -var ( - _ types.TaskScheduler = (*BalanceScheduler)(nil) -) - -// NewBalanceScheduler creates a new balance scheduler -func NewBalanceScheduler() *BalanceScheduler { - return &BalanceScheduler{ - enabled: true, - maxConcurrent: 1, // Only run one balance at a time - minInterval: 6 * time.Hour, - lastScheduled: make(map[string]time.Time), - minServerCount: 3, - moveDuringOffHours: true, - offHoursStart: "23:00", - offHoursEnd: "06:00", - } -} - -// GetTaskType returns the task type -func (s *BalanceScheduler) GetTaskType() types.TaskType { - return types.TaskTypeBalance -} - -// CanScheduleNow determines if a balance task can be scheduled -func (s *BalanceScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { - if !s.enabled { - return false - } - - // Count running balance tasks - runningBalanceCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeBalance { - runningBalanceCount++ - } - } - - // Check concurrency limit - if runningBalanceCount >= s.maxConcurrent { - glog.V(3).Infof("βΈοΈ Balance task blocked: too many running (%d >= %d)", runningBalanceCount, s.maxConcurrent) - return false - } - - // Check minimum interval between balance operations - if lastTime, exists := s.lastScheduled["balance"]; exists { - if time.Since(lastTime) < s.minInterval { - timeLeft := s.minInterval - time.Since(lastTime) - glog.V(3).Infof("βΈοΈ Balance task blocked: too soon (wait %v)", timeLeft) - return false - } - } - - // Check if we have available workers - availableWorkerCount := 0 - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeBalance { - availableWorkerCount++ - break - } - } - } - - if availableWorkerCount == 0 { - glog.V(3).Infof("βΈοΈ Balance task blocked: no available workers") - return false - } - - // All checks passed - can schedule - s.lastScheduled["balance"] = time.Now() - glog.V(2).Infof("β
Balance task can be scheduled (running: %d/%d, workers: %d)", - runningBalanceCount, s.maxConcurrent, availableWorkerCount) - return true -} - -// GetPriority returns the priority for balance tasks -func (s *BalanceScheduler) GetPriority(task *types.Task) types.TaskPriority { - // Balance is typically normal priority - not urgent but important for optimization - return types.TaskPriorityNormal -} - -// GetMaxConcurrent returns the maximum concurrent balance tasks -func (s *BalanceScheduler) GetMaxConcurrent() int { - return s.maxConcurrent -} - -// GetDefaultRepeatInterval returns the default interval to wait before repeating balance tasks -func (s *BalanceScheduler) GetDefaultRepeatInterval() time.Duration { - return s.minInterval -} - -// IsEnabled returns whether the scheduler is enabled -func (s *BalanceScheduler) IsEnabled() bool { - return s.enabled -} - -// SetEnabled sets whether the scheduler is enabled -func (s *BalanceScheduler) SetEnabled(enabled bool) { - s.enabled = enabled - glog.V(1).Infof("π Balance scheduler enabled: %v", enabled) -} - -// SetMaxConcurrent sets the maximum concurrent balance tasks -func (s *BalanceScheduler) SetMaxConcurrent(max int) { - s.maxConcurrent = max - glog.V(1).Infof("π Balance max concurrent set to: %d", max) -} - -// SetMinInterval sets the minimum interval between balance operations -func (s *BalanceScheduler) SetMinInterval(interval time.Duration) { - s.minInterval = interval - glog.V(1).Infof("π Balance minimum interval set to: %v", interval) -} - -// GetLastScheduled returns when we last scheduled this task type -func (s *BalanceScheduler) GetLastScheduled(taskKey string) time.Time { - if lastTime, exists := s.lastScheduled[taskKey]; exists { - return lastTime - } - return time.Time{} -} - -// SetLastScheduled updates when we last scheduled this task type -func (s *BalanceScheduler) SetLastScheduled(taskKey string, when time.Time) { - s.lastScheduled[taskKey] = when -} - -// GetMinServerCount returns the minimum server count -func (s *BalanceScheduler) GetMinServerCount() int { - return s.minServerCount -} - -// SetMinServerCount sets the minimum server count -func (s *BalanceScheduler) SetMinServerCount(count int) { - s.minServerCount = count - glog.V(1).Infof("π Balance minimum server count set to: %d", count) -} - -// GetMoveDuringOffHours returns whether to move only during off-hours -func (s *BalanceScheduler) GetMoveDuringOffHours() bool { - return s.moveDuringOffHours -} - -// SetMoveDuringOffHours sets whether to move only during off-hours -func (s *BalanceScheduler) SetMoveDuringOffHours(enabled bool) { - s.moveDuringOffHours = enabled - glog.V(1).Infof("π Balance move during off-hours: %v", enabled) -} - -// GetOffHoursStart returns the off-hours start time -func (s *BalanceScheduler) GetOffHoursStart() string { - return s.offHoursStart -} - -// SetOffHoursStart sets the off-hours start time -func (s *BalanceScheduler) SetOffHoursStart(start string) { - s.offHoursStart = start - glog.V(1).Infof("π Balance off-hours start time set to: %s", start) -} - -// GetOffHoursEnd returns the off-hours end time -func (s *BalanceScheduler) GetOffHoursEnd() string { - return s.offHoursEnd -} - -// SetOffHoursEnd sets the off-hours end time -func (s *BalanceScheduler) SetOffHoursEnd(end string) { - s.offHoursEnd = end - glog.V(1).Infof("π Balance off-hours end time set to: %s", end) -} - -// GetMinInterval returns the minimum interval -func (s *BalanceScheduler) GetMinInterval() time.Duration { - return s.minInterval -} diff --git a/weed/worker/tasks/balance/balance_typed.go b/weed/worker/tasks/balance/balance_typed.go new file mode 100644 index 000000000..91cd912f0 --- /dev/null +++ b/weed/worker/tasks/balance/balance_typed.go @@ -0,0 +1,156 @@ +package balance + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TypedTask implements balance operation with typed protobuf parameters +type TypedTask struct { + *base.BaseTypedTask + + // Task state from protobuf + sourceServer string + destNode string + volumeID uint32 + collection string + estimatedSize uint64 + placementScore float64 + forceMove bool + timeoutSeconds int32 + placementConflicts []string +} + +// NewTypedTask creates a new typed balance task +func NewTypedTask() types.TypedTaskInterface { + task := &TypedTask{ + BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance), + } + return task +} + +// ValidateTyped validates the typed parameters for balance task +func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error { + // Basic validation from base class + if err := t.BaseTypedTask.ValidateTyped(params); err != nil { + return err + } + + // Check that we have balance-specific parameters + balanceParams := params.GetBalanceParams() + if balanceParams == nil { + return fmt.Errorf("balance_params is required for balance task") + } + + // Validate destination node + if balanceParams.DestNode == "" { + return fmt.Errorf("dest_node is required for balance task") + } + + // Validate estimated size + if balanceParams.EstimatedSize == 0 { + return fmt.Errorf("estimated_size must be greater than 0") + } + + // Validate timeout + if balanceParams.TimeoutSeconds <= 0 { + return fmt.Errorf("timeout_seconds must be greater than 0") + } + + return nil +} + +// EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters +func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration { + balanceParams := params.GetBalanceParams() + if balanceParams != nil { + // Use the timeout from parameters if specified + if balanceParams.TimeoutSeconds > 0 { + return time.Duration(balanceParams.TimeoutSeconds) * time.Second + } + + // Estimate based on volume size (1 minute per GB) + if balanceParams.EstimatedSize > 0 { + gbSize := balanceParams.EstimatedSize / (1024 * 1024 * 1024) + return time.Duration(gbSize) * time.Minute + } + } + + // Default estimation + return 10 * time.Minute +} + +// ExecuteTyped implements the balance operation with typed parameters +func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error { + // Extract basic parameters + t.volumeID = params.VolumeId + t.sourceServer = params.Server + t.collection = params.Collection + + // Extract balance-specific parameters + balanceParams := params.GetBalanceParams() + if balanceParams != nil { + t.destNode = balanceParams.DestNode + t.estimatedSize = balanceParams.EstimatedSize + t.placementScore = balanceParams.PlacementScore + t.forceMove = balanceParams.ForceMove + t.timeoutSeconds = balanceParams.TimeoutSeconds + t.placementConflicts = balanceParams.PlacementConflicts + } + + glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)", + t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize) + + // Log placement information + if t.placementScore > 0 { + glog.V(1).Infof("Placement score: %.2f", t.placementScore) + } + if len(t.placementConflicts) > 0 { + glog.V(1).Infof("Placement conflicts: %v", t.placementConflicts) + if !t.forceMove { + return fmt.Errorf("placement conflicts detected and force_move is false: %v", t.placementConflicts) + } + glog.Warningf("Proceeding with balance despite conflicts (force_move=true): %v", t.placementConflicts) + } + + // Simulate balance operation with progress updates + steps := []struct { + name string + duration time.Duration + progress float64 + }{ + {"Analyzing cluster state", 2 * time.Second, 15}, + {"Verifying destination capacity", 1 * time.Second, 25}, + {"Starting volume migration", 1 * time.Second, 35}, + {"Moving volume data", 6 * time.Second, 75}, + {"Updating cluster metadata", 2 * time.Second, 95}, + {"Verifying balance completion", 1 * time.Second, 100}, + } + + for _, step := range steps { + if t.IsCancelled() { + return fmt.Errorf("balance task cancelled during: %s", step.name) + } + + glog.V(1).Infof("Balance task step: %s", step.name) + t.SetProgress(step.progress) + + // Simulate work + time.Sleep(step.duration) + } + + glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s", + t.volumeID, t.sourceServer, t.destNode) + return nil +} + +// Register the typed task in the global registry +func init() { + types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask) + glog.V(1).Infof("Registered typed balance task") +} diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go new file mode 100644 index 000000000..9303b4b2a --- /dev/null +++ b/weed/worker/tasks/balance/config.go @@ -0,0 +1,170 @@ +package balance + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with balance-specific settings +type Config struct { + base.BaseConfig + ImbalanceThreshold float64 `json:"imbalance_threshold"` + MinServerCount int `json:"min_server_count"` +} + +// NewDefaultConfig creates a new default balance configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30 * 60, // 30 minutes + MaxConcurrent: 1, + }, + ImbalanceThreshold: 0.2, // 20% + MinServerCount: 2, + } +} + +// GetConfigSpec returns the configuration schema for balance tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Balance Tasks", + Description: "Whether balance tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic balance task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 30 * 60, + MinValue: 5 * 60, + MaxValue: 2 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volume distribution imbalances", + HelpText: "The system will check for volume distribution imbalances at this interval", + Placeholder: "30", + Unit: config.UnitMinutes, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 3, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of balance tasks that can run simultaneously", + HelpText: "Limits the number of balance operations running at the same time", + Placeholder: "1 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "imbalance_threshold", + JSONName: "imbalance_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.2, + MinValue: 0.05, + MaxValue: 0.5, + Required: true, + DisplayName: "Imbalance Threshold", + Description: "Minimum imbalance ratio to trigger balancing", + HelpText: "Volume distribution imbalances above this threshold will trigger balancing", + Placeholder: "0.20 (20%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_server_count", + JSONName: "min_server_count", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 2, + MaxValue: 10, + Required: true, + DisplayName: "Minimum Server Count", + Description: "Minimum number of servers required for balancing", + HelpText: "Balancing will only occur if there are at least this many servers", + Placeholder: "2 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} + +// ToTaskPolicy converts configuration to a TaskPolicy protobuf message +func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: float64(c.ImbalanceThreshold), + MinServerCount: int32(c.MinServerCount), + }, + }, + } +} + +// FromTaskPolicy loads configuration from a TaskPolicy protobuf message +func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + + // Set general TaskPolicy fields + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping + + // Set balance-specific fields from the task config + if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil { + c.ImbalanceThreshold = float64(balanceConfig.ImbalanceThreshold) + c.MinServerCount = int(balanceConfig.MinServerCount) + } + + return nil +} + +// LoadConfigFromPersistence loads configuration from the persistence layer if available +func LoadConfigFromPersistence(configPersistence interface{}) *Config { + config := NewDefaultConfig() + + // Try to load from persistence if available + if persistence, ok := configPersistence.(interface { + LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error) + }); ok { + if policy, err := persistence.LoadBalanceTaskPolicy(); err == nil && policy != nil { + if err := config.FromTaskPolicy(policy); err == nil { + glog.V(1).Infof("Loaded balance configuration from persistence") + return config + } + } + } + + glog.V(1).Infof("Using default balance configuration") + return config +} diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go new file mode 100644 index 000000000..f4bcf3ca3 --- /dev/null +++ b/weed/worker/tasks/balance/detection.go @@ -0,0 +1,134 @@ +package balance + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection implements the detection logic for balance tasks +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + balanceConfig := config.(*Config) + + // Skip if cluster is too small + minVolumeCount := 2 // More reasonable for small clusters + if len(metrics) < minVolumeCount { + glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need β₯%d)", len(metrics), minVolumeCount) + return nil, nil + } + + // Analyze volume distribution across servers + serverVolumeCounts := make(map[string]int) + for _, metric := range metrics { + serverVolumeCounts[metric.Server]++ + } + + if len(serverVolumeCounts) < balanceConfig.MinServerCount { + glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need β₯%d)", len(serverVolumeCounts), balanceConfig.MinServerCount) + return nil, nil + } + + // Calculate balance metrics + totalVolumes := len(metrics) + avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) + + maxVolumes := 0 + minVolumes := totalVolumes + maxServer := "" + minServer := "" + + for server, count := range serverVolumeCounts { + if count > maxVolumes { + maxVolumes = count + maxServer = server + } + if count < minVolumes { + minVolumes = count + minServer = server + } + } + + // Check if imbalance exceeds threshold + imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer + if imbalanceRatio <= balanceConfig.ImbalanceThreshold { + glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", + imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + return nil, nil + } + + // Select a volume from the overloaded server for balance + var selectedVolume *types.VolumeHealthMetrics + for _, metric := range metrics { + if metric.Server == maxServer { + selectedVolume = metric + break + } + } + + if selectedVolume == nil { + glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer) + return nil, nil + } + + // Create balance task with volume and destination planning info + reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", + imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + + task := &types.TaskDetectionResult{ + TaskType: types.TaskTypeBalance, + VolumeID: selectedVolume.VolumeID, + Server: selectedVolume.Server, + Collection: selectedVolume.Collection, + Priority: types.TaskPriorityNormal, + Reason: reason, + ScheduleAt: time.Now(), + // TypedParams will be populated by the maintenance integration + // with destination planning information + } + + return []*types.TaskDetectionResult{task}, nil +} + +// Scheduling implements the scheduling logic for balance tasks +func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + balanceConfig := config.(*Config) + + // Count running balance tasks + runningBalanceCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeBalance { + runningBalanceCount++ + } + } + + // Check concurrency limit + if runningBalanceCount >= balanceConfig.MaxConcurrent { + return false + } + + // Check if we have available workers + availableWorkerCount := 0 + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeBalance { + availableWorkerCount++ + break + } + } + } + + return availableWorkerCount > 0 +} + +// CreateTask creates a new balance task instance +func CreateTask(params types.TaskParams) (types.TaskInterface, error) { + // Create and return the balance task using existing Task type + return NewTask(params.Server, params.VolumeID, params.Collection), nil +} diff --git a/weed/worker/tasks/balance/ui.go b/weed/worker/tasks/balance/ui.go deleted file mode 100644 index 2cea20a76..000000000 --- a/weed/worker/tasks/balance/ui.go +++ /dev/null @@ -1,361 +0,0 @@ -package balance - -import ( - "fmt" - "html/template" - "strconv" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// UIProvider provides the UI for balance task configuration -type UIProvider struct { - detector *BalanceDetector - scheduler *BalanceScheduler -} - -// NewUIProvider creates a new balance UI provider -func NewUIProvider(detector *BalanceDetector, scheduler *BalanceScheduler) *UIProvider { - return &UIProvider{ - detector: detector, - scheduler: scheduler, - } -} - -// GetTaskType returns the task type -func (ui *UIProvider) GetTaskType() types.TaskType { - return types.TaskTypeBalance -} - -// GetDisplayName returns the human-readable name -func (ui *UIProvider) GetDisplayName() string { - return "Volume Balance" -} - -// GetDescription returns a description of what this task does -func (ui *UIProvider) GetDescription() string { - return "Redistributes volumes across volume servers to optimize storage utilization and performance" -} - -// GetIcon returns the icon CSS class for this task type -func (ui *UIProvider) GetIcon() string { - return "fas fa-balance-scale text-secondary" -} - -// BalanceConfig represents the balance configuration -type BalanceConfig struct { - Enabled bool `json:"enabled"` - ImbalanceThreshold float64 `json:"imbalance_threshold"` - ScanIntervalSeconds int `json:"scan_interval_seconds"` - MaxConcurrent int `json:"max_concurrent"` - MinServerCount int `json:"min_server_count"` - MoveDuringOffHours bool `json:"move_during_off_hours"` - OffHoursStart string `json:"off_hours_start"` - OffHoursEnd string `json:"off_hours_end"` - MinIntervalSeconds int `json:"min_interval_seconds"` -} - -// Helper functions for duration conversion -func secondsToDuration(seconds int) time.Duration { - return time.Duration(seconds) * time.Second -} - -func durationToSeconds(d time.Duration) int { - return int(d.Seconds()) -} - -// formatDurationForUser formats seconds as a user-friendly duration string -func formatDurationForUser(seconds int) string { - d := secondsToDuration(seconds) - if d < time.Minute { - return fmt.Sprintf("%ds", seconds) - } - if d < time.Hour { - return fmt.Sprintf("%.0fm", d.Minutes()) - } - if d < 24*time.Hour { - return fmt.Sprintf("%.1fh", d.Hours()) - } - return fmt.Sprintf("%.1fd", d.Hours()/24) -} - -// RenderConfigForm renders the configuration form HTML -func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) { - config := ui.getCurrentBalanceConfig() - - // Build form using the FormBuilder helper - form := types.NewFormBuilder() - - // Detection Settings - form.AddCheckboxField( - "enabled", - "Enable Balance Tasks", - "Whether balance tasks should be automatically created", - config.Enabled, - ) - - form.AddNumberField( - "imbalance_threshold", - "Imbalance Threshold (%)", - "Trigger balance when storage imbalance exceeds this percentage (0.0-1.0)", - config.ImbalanceThreshold, - true, - ) - - form.AddDurationField("scan_interval", "Scan Interval", "How often to scan for imbalanced volumes", secondsToDuration(config.ScanIntervalSeconds), true) - - // Scheduling Settings - form.AddNumberField( - "max_concurrent", - "Max Concurrent Tasks", - "Maximum number of balance tasks that can run simultaneously", - float64(config.MaxConcurrent), - true, - ) - - form.AddNumberField( - "min_server_count", - "Minimum Server Count", - "Only balance when at least this many servers are available", - float64(config.MinServerCount), - true, - ) - - // Timing Settings - form.AddCheckboxField( - "move_during_off_hours", - "Restrict to Off-Hours", - "Only perform balance operations during off-peak hours", - config.MoveDuringOffHours, - ) - - form.AddTextField( - "off_hours_start", - "Off-Hours Start Time", - "Start time for off-hours window (e.g., 23:00)", - config.OffHoursStart, - false, - ) - - form.AddTextField( - "off_hours_end", - "Off-Hours End Time", - "End time for off-hours window (e.g., 06:00)", - config.OffHoursEnd, - false, - ) - - // Timing constraints - form.AddDurationField("min_interval", "Min Interval", "Minimum time between balance operations", secondsToDuration(config.MinIntervalSeconds), true) - - // Generate organized form sections using Bootstrap components - html := ` -<div class="row"> - <div class="col-12"> - <div class="card mb-4"> - <div class="card-header"> - <h5 class="mb-0"> - <i class="fas fa-balance-scale me-2"></i> - Balance Configuration - </h5> - </div> - <div class="card-body"> -` + string(form.Build()) + ` - </div> - </div> - </div> -</div> - -<div class="row"> - <div class="col-12"> - <div class="card mb-3"> - <div class="card-header"> - <h5 class="mb-0"> - <i class="fas fa-exclamation-triangle me-2"></i> - Performance Considerations - </h5> - </div> - <div class="card-body"> - <div class="alert alert-warning" role="alert"> - <h6 class="alert-heading">Important Considerations:</h6> - <p class="mb-2"><strong>Performance:</strong> Volume balancing involves data movement and can impact cluster performance.</p> - <p class="mb-2"><strong>Recommendation:</strong> Enable off-hours restriction to minimize impact on production workloads.</p> - <p class="mb-0"><strong>Safety:</strong> Requires at least ` + fmt.Sprintf("%d", config.MinServerCount) + ` servers to ensure data safety during moves.</p> - </div> - </div> - </div> - </div> -</div>` - - return template.HTML(html), nil -} - -// ParseConfigForm parses form data into configuration -func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) { - config := &BalanceConfig{} - - // Parse enabled - config.Enabled = len(formData["enabled"]) > 0 - - // Parse imbalance threshold - if values, ok := formData["imbalance_threshold"]; ok && len(values) > 0 { - threshold, err := strconv.ParseFloat(values[0], 64) - if err != nil { - return nil, fmt.Errorf("invalid imbalance threshold: %w", err) - } - if threshold < 0 || threshold > 1 { - return nil, fmt.Errorf("imbalance threshold must be between 0.0 and 1.0") - } - config.ImbalanceThreshold = threshold - } - - // Parse scan interval - if values, ok := formData["scan_interval"]; ok && len(values) > 0 { - duration, err := time.ParseDuration(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid scan interval: %w", err) - } - config.ScanIntervalSeconds = int(duration.Seconds()) - } - - // Parse max concurrent - if values, ok := formData["max_concurrent"]; ok && len(values) > 0 { - maxConcurrent, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid max concurrent: %w", err) - } - if maxConcurrent < 1 { - return nil, fmt.Errorf("max concurrent must be at least 1") - } - config.MaxConcurrent = maxConcurrent - } - - // Parse min server count - if values, ok := formData["min_server_count"]; ok && len(values) > 0 { - minServerCount, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid min server count: %w", err) - } - if minServerCount < 2 { - return nil, fmt.Errorf("min server count must be at least 2") - } - config.MinServerCount = minServerCount - } - - // Parse off-hours settings - config.MoveDuringOffHours = len(formData["move_during_off_hours"]) > 0 - - if values, ok := formData["off_hours_start"]; ok && len(values) > 0 { - config.OffHoursStart = values[0] - } - - if values, ok := formData["off_hours_end"]; ok && len(values) > 0 { - config.OffHoursEnd = values[0] - } - - // Parse min interval - if values, ok := formData["min_interval"]; ok && len(values) > 0 { - duration, err := time.ParseDuration(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid min interval: %w", err) - } - config.MinIntervalSeconds = int(duration.Seconds()) - } - - return config, nil -} - -// GetCurrentConfig returns the current configuration -func (ui *UIProvider) GetCurrentConfig() interface{} { - return ui.getCurrentBalanceConfig() -} - -// ApplyConfig applies the new configuration -func (ui *UIProvider) ApplyConfig(config interface{}) error { - balanceConfig, ok := config.(*BalanceConfig) - if !ok { - return fmt.Errorf("invalid config type, expected *BalanceConfig") - } - - // Apply to detector - if ui.detector != nil { - ui.detector.SetEnabled(balanceConfig.Enabled) - ui.detector.SetThreshold(balanceConfig.ImbalanceThreshold) - ui.detector.SetMinCheckInterval(secondsToDuration(balanceConfig.ScanIntervalSeconds)) - } - - // Apply to scheduler - if ui.scheduler != nil { - ui.scheduler.SetEnabled(balanceConfig.Enabled) - ui.scheduler.SetMaxConcurrent(balanceConfig.MaxConcurrent) - ui.scheduler.SetMinServerCount(balanceConfig.MinServerCount) - ui.scheduler.SetMoveDuringOffHours(balanceConfig.MoveDuringOffHours) - ui.scheduler.SetOffHoursStart(balanceConfig.OffHoursStart) - ui.scheduler.SetOffHoursEnd(balanceConfig.OffHoursEnd) - } - - glog.V(1).Infof("Applied balance configuration: enabled=%v, threshold=%.1f%%, max_concurrent=%d, min_servers=%d, off_hours=%v", - balanceConfig.Enabled, balanceConfig.ImbalanceThreshold*100, balanceConfig.MaxConcurrent, - balanceConfig.MinServerCount, balanceConfig.MoveDuringOffHours) - - return nil -} - -// getCurrentBalanceConfig gets the current configuration from detector and scheduler -func (ui *UIProvider) getCurrentBalanceConfig() *BalanceConfig { - config := &BalanceConfig{ - // Default values (fallback if detectors/schedulers are nil) - Enabled: true, - ImbalanceThreshold: 0.1, // 10% imbalance - ScanIntervalSeconds: durationToSeconds(4 * time.Hour), - MaxConcurrent: 1, - MinServerCount: 3, - MoveDuringOffHours: true, - OffHoursStart: "23:00", - OffHoursEnd: "06:00", - MinIntervalSeconds: durationToSeconds(1 * time.Hour), - } - - // Get current values from detector - if ui.detector != nil { - config.Enabled = ui.detector.IsEnabled() - config.ImbalanceThreshold = ui.detector.GetThreshold() - config.ScanIntervalSeconds = int(ui.detector.ScanInterval().Seconds()) - } - - // Get current values from scheduler - if ui.scheduler != nil { - config.MaxConcurrent = ui.scheduler.GetMaxConcurrent() - config.MinServerCount = ui.scheduler.GetMinServerCount() - config.MoveDuringOffHours = ui.scheduler.GetMoveDuringOffHours() - config.OffHoursStart = ui.scheduler.GetOffHoursStart() - config.OffHoursEnd = ui.scheduler.GetOffHoursEnd() - } - - return config -} - -// RegisterUI registers the balance UI provider with the UI registry -func RegisterUI(uiRegistry *types.UIRegistry, detector *BalanceDetector, scheduler *BalanceScheduler) { - uiProvider := NewUIProvider(detector, scheduler) - uiRegistry.RegisterUI(uiProvider) - - glog.V(1).Infof("β
Registered balance task UI provider") -} - -// DefaultBalanceConfig returns default balance configuration -func DefaultBalanceConfig() *BalanceConfig { - return &BalanceConfig{ - Enabled: false, - ImbalanceThreshold: 0.3, - ScanIntervalSeconds: durationToSeconds(4 * time.Hour), - MaxConcurrent: 1, - MinServerCount: 3, - MoveDuringOffHours: false, - OffHoursStart: "22:00", - OffHoursEnd: "06:00", - MinIntervalSeconds: durationToSeconds(1 * time.Hour), - } -} diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go new file mode 100644 index 000000000..27ad1bb29 --- /dev/null +++ b/weed/worker/tasks/base/generic_components.go @@ -0,0 +1,129 @@ +package base + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// GenericDetector implements TaskDetector using function-based logic +type GenericDetector struct { + taskDef *TaskDefinition +} + +// NewGenericDetector creates a detector from a task definition +func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector { + return &GenericDetector{taskDef: taskDef} +} + +// GetTaskType returns the task type +func (d *GenericDetector) GetTaskType() types.TaskType { + return d.taskDef.Type +} + +// ScanForTasks scans using the task definition's detection function +func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { + if d.taskDef.DetectionFunc == nil { + return nil, nil + } + return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config) +} + +// ScanInterval returns the scan interval from task definition +func (d *GenericDetector) ScanInterval() time.Duration { + if d.taskDef.ScanInterval > 0 { + return d.taskDef.ScanInterval + } + return 30 * time.Minute // Default +} + +// IsEnabled returns whether this detector is enabled +func (d *GenericDetector) IsEnabled() bool { + return d.taskDef.Config.IsEnabled() +} + +// GenericScheduler implements TaskScheduler using function-based logic +type GenericScheduler struct { + taskDef *TaskDefinition +} + +// NewGenericScheduler creates a scheduler from a task definition +func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler { + return &GenericScheduler{taskDef: taskDef} +} + +// GetTaskType returns the task type +func (s *GenericScheduler) GetTaskType() types.TaskType { + return s.taskDef.Type +} + +// CanScheduleNow determines if a task can be scheduled using the task definition's function +func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { + if s.taskDef.SchedulingFunc == nil { + return s.defaultCanSchedule(task, runningTasks, availableWorkers) + } + return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config) +} + +// defaultCanSchedule provides default scheduling logic +func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { + if !s.taskDef.Config.IsEnabled() { + return false + } + + // Count running tasks of this type + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == s.taskDef.Type { + runningCount++ + } + } + + // Check concurrency limit + maxConcurrent := s.taskDef.MaxConcurrent + if maxConcurrent <= 0 { + maxConcurrent = 1 // Default + } + if runningCount >= maxConcurrent { + return false + } + + // Check if we have available workers + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == s.taskDef.Type { + return true + } + } + } + } + + return false +} + +// GetPriority returns the priority for this task +func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority { + return task.Priority +} + +// GetMaxConcurrent returns max concurrent tasks +func (s *GenericScheduler) GetMaxConcurrent() int { + if s.taskDef.MaxConcurrent > 0 { + return s.taskDef.MaxConcurrent + } + return 1 // Default +} + +// GetDefaultRepeatInterval returns the default repeat interval +func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration { + if s.taskDef.RepeatInterval > 0 { + return s.taskDef.RepeatInterval + } + return 24 * time.Hour // Default +} + +// IsEnabled returns whether this scheduler is enabled +func (s *GenericScheduler) IsEnabled() bool { + return s.taskDef.Config.IsEnabled() +} diff --git a/weed/worker/tasks/base/registration.go b/weed/worker/tasks/base/registration.go new file mode 100644 index 000000000..416b6f6b8 --- /dev/null +++ b/weed/worker/tasks/base/registration.go @@ -0,0 +1,155 @@ +package base + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// GenericFactory creates task instances using a TaskDefinition +type GenericFactory struct { + *tasks.BaseTaskFactory + taskDef *TaskDefinition +} + +// NewGenericFactory creates a generic task factory +func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory { + return &GenericFactory{ + BaseTaskFactory: tasks.NewBaseTaskFactory( + taskDef.Type, + taskDef.Capabilities, + taskDef.Description, + ), + taskDef: taskDef, + } +} + +// Create creates a task instance using the task definition +func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) { + if f.taskDef.CreateTask == nil { + return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type) + } + return f.taskDef.CreateTask(params) +} + +// GenericSchemaProvider provides config schema from TaskDefinition +type GenericSchemaProvider struct { + taskDef *TaskDefinition +} + +// GetConfigSchema returns the schema from task definition +func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema { + return &tasks.TaskConfigSchema{ + TaskName: string(p.taskDef.Type), + DisplayName: p.taskDef.DisplayName, + Description: p.taskDef.Description, + Icon: p.taskDef.Icon, + Schema: config.Schema{ + Fields: p.taskDef.ConfigSpec.Fields, + }, + } +} + +// GenericUIProvider provides UI functionality from TaskDefinition +type GenericUIProvider struct { + taskDef *TaskDefinition +} + +// GetTaskType returns the task type +func (ui *GenericUIProvider) GetTaskType() types.TaskType { + return ui.taskDef.Type +} + +// GetDisplayName returns the human-readable name +func (ui *GenericUIProvider) GetDisplayName() string { + return ui.taskDef.DisplayName +} + +// GetDescription returns a description of what this task does +func (ui *GenericUIProvider) GetDescription() string { + return ui.taskDef.Description +} + +// GetIcon returns the icon CSS class for this task type +func (ui *GenericUIProvider) GetIcon() string { + return ui.taskDef.Icon +} + +// GetCurrentConfig returns current config as TaskConfig +func (ui *GenericUIProvider) GetCurrentConfig() types.TaskConfig { + return ui.taskDef.Config +} + +// ApplyTaskPolicy applies protobuf TaskPolicy configuration +func (ui *GenericUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error { + return ui.taskDef.Config.FromTaskPolicy(policy) +} + +// ApplyTaskConfig applies TaskConfig interface configuration +func (ui *GenericUIProvider) ApplyTaskConfig(config types.TaskConfig) error { + taskPolicy := config.ToTaskPolicy() + return ui.taskDef.Config.FromTaskPolicy(taskPolicy) +} + +// RegisterTask registers a complete task definition with all registries +func RegisterTask(taskDef *TaskDefinition) { + // Validate task definition + if err := validateTaskDefinition(taskDef); err != nil { + glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err) + return + } + + // Create and register factory + factory := NewGenericFactory(taskDef) + tasks.AutoRegister(taskDef.Type, factory) + + // Create and register detector/scheduler + detector := NewGenericDetector(taskDef) + scheduler := NewGenericScheduler(taskDef) + + tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { + registry.RegisterTask(detector, scheduler) + }) + + // Create and register schema provider + schemaProvider := &GenericSchemaProvider{taskDef: taskDef} + tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider) + + // Create and register UI provider + uiProvider := &GenericUIProvider{taskDef: taskDef} + tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { + baseUIProvider := tasks.NewBaseUIProvider( + taskDef.Type, + taskDef.DisplayName, + taskDef.Description, + taskDef.Icon, + schemaProvider.GetConfigSchema, + uiProvider.GetCurrentConfig, + uiProvider.ApplyTaskPolicy, + uiProvider.ApplyTaskConfig, + ) + uiRegistry.RegisterUI(baseUIProvider) + }) + + glog.V(1).Infof("β
Registered complete task definition: %s", taskDef.Type) +} + +// validateTaskDefinition ensures the task definition is complete +func validateTaskDefinition(taskDef *TaskDefinition) error { + if taskDef.Type == "" { + return fmt.Errorf("task type is required") + } + if taskDef.Name == "" { + return fmt.Errorf("task name is required") + } + if taskDef.Config == nil { + return fmt.Errorf("task config is required") + } + // CreateTask is optional for tasks that use the typed task system + // The typed system registers tasks separately via types.RegisterGlobalTypedTask() + return nil +} diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go new file mode 100644 index 000000000..6689d9c81 --- /dev/null +++ b/weed/worker/tasks/base/task_definition.go @@ -0,0 +1,272 @@ +package base + +import ( + "fmt" + "reflect" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TaskDefinition encapsulates everything needed to define a complete task type +type TaskDefinition struct { + // Basic task information + Type types.TaskType + Name string + DisplayName string + Description string + Icon string + Capabilities []string + + // Task configuration + Config TaskConfig + ConfigSpec ConfigSpec + + // Task creation + CreateTask func(params types.TaskParams) (types.TaskInterface, error) + + // Detection logic + DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error) + ScanInterval time.Duration + + // Scheduling logic + SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool + MaxConcurrent int + RepeatInterval time.Duration +} + +// TaskConfig provides a configuration interface that supports type-safe defaults +type TaskConfig interface { + config.ConfigWithDefaults // Extends ConfigWithDefaults for type-safe schema operations + IsEnabled() bool + SetEnabled(bool) + ToTaskPolicy() *worker_pb.TaskPolicy + FromTaskPolicy(policy *worker_pb.TaskPolicy) error +} + +// ConfigSpec defines the configuration schema +type ConfigSpec struct { + Fields []*config.Field +} + +// BaseConfig provides common configuration fields with reflection-based serialization +type BaseConfig struct { + Enabled bool `json:"enabled"` + ScanIntervalSeconds int `json:"scan_interval_seconds"` + MaxConcurrent int `json:"max_concurrent"` +} + +// IsEnabled returns whether the task is enabled +func (c *BaseConfig) IsEnabled() bool { + return c.Enabled +} + +// SetEnabled sets whether the task is enabled +func (c *BaseConfig) SetEnabled(enabled bool) { + c.Enabled = enabled +} + +// Validate validates the base configuration +func (c *BaseConfig) Validate() error { + // Common validation logic + return nil +} + +// StructToMap converts any struct to a map using reflection +func StructToMap(obj interface{}) map[string]interface{} { + result := make(map[string]interface{}) + val := reflect.ValueOf(obj) + + // Handle pointer to struct + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + + if val.Kind() != reflect.Struct { + return result + } + + typ := val.Type() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Skip unexported fields + if !field.CanInterface() { + continue + } + + // Handle embedded structs recursively (before JSON tag check) + if field.Kind() == reflect.Struct && fieldType.Anonymous { + embeddedMap := StructToMap(field.Interface()) + for k, v := range embeddedMap { + result[k] = v + } + continue + } + + // Get JSON tag name + jsonTag := fieldType.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + continue + } + + // Remove options like ",omitempty" + if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 { + jsonTag = jsonTag[:commaIdx] + } + + result[jsonTag] = field.Interface() + } + return result +} + +// MapToStruct loads data from map into struct using reflection +func MapToStruct(data map[string]interface{}, obj interface{}) error { + val := reflect.ValueOf(obj) + + // Must be pointer to struct + if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct { + return fmt.Errorf("obj must be pointer to struct") + } + + val = val.Elem() + typ := val.Type() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Skip unexported fields + if !field.CanSet() { + continue + } + + // Handle embedded structs recursively (before JSON tag check) + if field.Kind() == reflect.Struct && fieldType.Anonymous { + err := MapToStruct(data, field.Addr().Interface()) + if err != nil { + return err + } + continue + } + + // Get JSON tag name + jsonTag := fieldType.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + continue + } + + // Remove options like ",omitempty" + if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 { + jsonTag = jsonTag[:commaIdx] + } + + if value, exists := data[jsonTag]; exists { + err := setFieldValue(field, value) + if err != nil { + return fmt.Errorf("failed to set field %s: %v", jsonTag, err) + } + } + } + + return nil +} + +// ToMap converts config to map using reflection +// ToTaskPolicy converts BaseConfig to protobuf (partial implementation) +// Note: Concrete implementations should override this to include task-specific config +func (c *BaseConfig) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + // TaskConfig field should be set by concrete implementations + } +} + +// FromTaskPolicy loads BaseConfig from protobuf (partial implementation) +// Note: Concrete implementations should override this to handle task-specific config +func (c *BaseConfig) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) + return nil +} + +// ApplySchemaDefaults applies default values from schema using reflection +func (c *BaseConfig) ApplySchemaDefaults(schema *config.Schema) error { + // Use reflection-based approach for BaseConfig since it needs to handle embedded structs + return schema.ApplyDefaultsToProtobuf(c) +} + +// setFieldValue sets a field value with type conversion +func setFieldValue(field reflect.Value, value interface{}) error { + if value == nil { + return nil + } + + valueVal := reflect.ValueOf(value) + fieldType := field.Type() + valueType := valueVal.Type() + + // Direct assignment if types match + if valueType.AssignableTo(fieldType) { + field.Set(valueVal) + return nil + } + + // Type conversion for common cases + switch fieldType.Kind() { + case reflect.Bool: + if b, ok := value.(bool); ok { + field.SetBool(b) + } else { + return fmt.Errorf("cannot convert %T to bool", value) + } + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + switch v := value.(type) { + case int: + field.SetInt(int64(v)) + case int32: + field.SetInt(int64(v)) + case int64: + field.SetInt(v) + case float64: + field.SetInt(int64(v)) + default: + return fmt.Errorf("cannot convert %T to int", value) + } + case reflect.Float32, reflect.Float64: + switch v := value.(type) { + case float32: + field.SetFloat(float64(v)) + case float64: + field.SetFloat(v) + case int: + field.SetFloat(float64(v)) + case int64: + field.SetFloat(float64(v)) + default: + return fmt.Errorf("cannot convert %T to float", value) + } + case reflect.String: + if s, ok := value.(string); ok { + field.SetString(s) + } else { + return fmt.Errorf("cannot convert %T to string", value) + } + default: + return fmt.Errorf("unsupported field type %s", fieldType.Kind()) + } + + return nil +} diff --git a/weed/worker/tasks/base/task_definition_test.go b/weed/worker/tasks/base/task_definition_test.go new file mode 100644 index 000000000..a0a0a5a24 --- /dev/null +++ b/weed/worker/tasks/base/task_definition_test.go @@ -0,0 +1,338 @@ +package base + +import ( + "reflect" + "testing" +) + +// Test structs that mirror the actual configuration structure +type TestBaseConfig struct { + Enabled bool `json:"enabled"` + ScanIntervalSeconds int `json:"scan_interval_seconds"` + MaxConcurrent int `json:"max_concurrent"` +} + +type TestTaskConfig struct { + TestBaseConfig + TaskSpecificField float64 `json:"task_specific_field"` + AnotherSpecificField string `json:"another_specific_field"` +} + +type TestNestedConfig struct { + TestBaseConfig + NestedStruct struct { + NestedField string `json:"nested_field"` + } `json:"nested_struct"` + TaskField int `json:"task_field"` +} + +func TestStructToMap_WithEmbeddedStruct(t *testing.T) { + // Test case 1: Basic embedded struct + config := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 3, + }, + TaskSpecificField: 0.25, + AnotherSpecificField: "test_value", + } + + result := StructToMap(config) + + // Verify all fields are present + expectedFields := map[string]interface{}{ + "enabled": true, + "scan_interval_seconds": 1800, + "max_concurrent": 3, + "task_specific_field": 0.25, + "another_specific_field": "test_value", + } + + if len(result) != len(expectedFields) { + t.Errorf("Expected %d fields, got %d. Result: %+v", len(expectedFields), len(result), result) + } + + for key, expectedValue := range expectedFields { + if actualValue, exists := result[key]; !exists { + t.Errorf("Missing field: %s", key) + } else if !reflect.DeepEqual(actualValue, expectedValue) { + t.Errorf("Field %s: expected %v (%T), got %v (%T)", key, expectedValue, expectedValue, actualValue, actualValue) + } + } +} + +func TestStructToMap_WithNestedStruct(t *testing.T) { + config := &TestNestedConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: false, + ScanIntervalSeconds: 3600, + MaxConcurrent: 1, + }, + NestedStruct: struct { + NestedField string `json:"nested_field"` + }{ + NestedField: "nested_value", + }, + TaskField: 42, + } + + result := StructToMap(config) + + // Verify embedded struct fields are included + if enabled, exists := result["enabled"]; !exists || enabled != false { + t.Errorf("Expected enabled=false from embedded struct, got %v", enabled) + } + + if scanInterval, exists := result["scan_interval_seconds"]; !exists || scanInterval != 3600 { + t.Errorf("Expected scan_interval_seconds=3600 from embedded struct, got %v", scanInterval) + } + + if maxConcurrent, exists := result["max_concurrent"]; !exists || maxConcurrent != 1 { + t.Errorf("Expected max_concurrent=1 from embedded struct, got %v", maxConcurrent) + } + + // Verify regular fields are included + if taskField, exists := result["task_field"]; !exists || taskField != 42 { + t.Errorf("Expected task_field=42, got %v", taskField) + } + + // Verify nested struct is included as a whole + if nestedStruct, exists := result["nested_struct"]; !exists { + t.Errorf("Missing nested_struct field") + } else { + // The nested struct should be included as-is, not flattened + if nested, ok := nestedStruct.(struct { + NestedField string `json:"nested_field"` + }); !ok || nested.NestedField != "nested_value" { + t.Errorf("Expected nested_struct with NestedField='nested_value', got %v", nestedStruct) + } + } +} + +func TestMapToStruct_WithEmbeddedStruct(t *testing.T) { + // Test data with all fields including embedded struct fields + data := map[string]interface{}{ + "enabled": true, + "scan_interval_seconds": 2400, + "max_concurrent": 5, + "task_specific_field": 0.15, + "another_specific_field": "updated_value", + } + + config := &TestTaskConfig{} + err := MapToStruct(data, config) + + if err != nil { + t.Fatalf("MapToStruct failed: %v", err) + } + + // Verify embedded struct fields were set + if config.Enabled != true { + t.Errorf("Expected Enabled=true, got %v", config.Enabled) + } + + if config.ScanIntervalSeconds != 2400 { + t.Errorf("Expected ScanIntervalSeconds=2400, got %v", config.ScanIntervalSeconds) + } + + if config.MaxConcurrent != 5 { + t.Errorf("Expected MaxConcurrent=5, got %v", config.MaxConcurrent) + } + + // Verify regular fields were set + if config.TaskSpecificField != 0.15 { + t.Errorf("Expected TaskSpecificField=0.15, got %v", config.TaskSpecificField) + } + + if config.AnotherSpecificField != "updated_value" { + t.Errorf("Expected AnotherSpecificField='updated_value', got %v", config.AnotherSpecificField) + } +} + +func TestMapToStruct_PartialData(t *testing.T) { + // Test with only some fields present (simulating form data) + data := map[string]interface{}{ + "enabled": false, + "max_concurrent": 2, + "task_specific_field": 0.30, + } + + // Start with some initial values + config := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 1, + }, + TaskSpecificField: 0.20, + AnotherSpecificField: "initial_value", + } + + err := MapToStruct(data, config) + + if err != nil { + t.Fatalf("MapToStruct failed: %v", err) + } + + // Verify updated fields + if config.Enabled != false { + t.Errorf("Expected Enabled=false (updated), got %v", config.Enabled) + } + + if config.MaxConcurrent != 2 { + t.Errorf("Expected MaxConcurrent=2 (updated), got %v", config.MaxConcurrent) + } + + if config.TaskSpecificField != 0.30 { + t.Errorf("Expected TaskSpecificField=0.30 (updated), got %v", config.TaskSpecificField) + } + + // Verify unchanged fields remain the same + if config.ScanIntervalSeconds != 1800 { + t.Errorf("Expected ScanIntervalSeconds=1800 (unchanged), got %v", config.ScanIntervalSeconds) + } + + if config.AnotherSpecificField != "initial_value" { + t.Errorf("Expected AnotherSpecificField='initial_value' (unchanged), got %v", config.AnotherSpecificField) + } +} + +func TestRoundTripSerialization(t *testing.T) { + // Test complete round-trip: struct -> map -> struct + original := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 3600, + MaxConcurrent: 4, + }, + TaskSpecificField: 0.18, + AnotherSpecificField: "round_trip_test", + } + + // Convert to map + dataMap := StructToMap(original) + + // Convert back to struct + roundTrip := &TestTaskConfig{} + err := MapToStruct(dataMap, roundTrip) + + if err != nil { + t.Fatalf("Round-trip MapToStruct failed: %v", err) + } + + // Verify all fields match + if !reflect.DeepEqual(original.TestBaseConfig, roundTrip.TestBaseConfig) { + t.Errorf("BaseConfig mismatch:\nOriginal: %+v\nRound-trip: %+v", original.TestBaseConfig, roundTrip.TestBaseConfig) + } + + if original.TaskSpecificField != roundTrip.TaskSpecificField { + t.Errorf("TaskSpecificField mismatch: %v != %v", original.TaskSpecificField, roundTrip.TaskSpecificField) + } + + if original.AnotherSpecificField != roundTrip.AnotherSpecificField { + t.Errorf("AnotherSpecificField mismatch: %v != %v", original.AnotherSpecificField, roundTrip.AnotherSpecificField) + } +} + +func TestStructToMap_EmptyStruct(t *testing.T) { + config := &TestTaskConfig{} + result := StructToMap(config) + + // Should still include all fields, even with zero values + expectedFields := []string{"enabled", "scan_interval_seconds", "max_concurrent", "task_specific_field", "another_specific_field"} + + for _, field := range expectedFields { + if _, exists := result[field]; !exists { + t.Errorf("Missing field: %s", field) + } + } +} + +func TestStructToMap_NilPointer(t *testing.T) { + var config *TestTaskConfig = nil + result := StructToMap(config) + + if len(result) != 0 { + t.Errorf("Expected empty map for nil pointer, got %+v", result) + } +} + +func TestMapToStruct_InvalidInput(t *testing.T) { + data := map[string]interface{}{ + "enabled": "not_a_bool", // Wrong type + } + + config := &TestTaskConfig{} + err := MapToStruct(data, config) + + if err == nil { + t.Errorf("Expected error for invalid input type, but got none") + } +} + +func TestMapToStruct_NonPointer(t *testing.T) { + data := map[string]interface{}{ + "enabled": true, + } + + config := TestTaskConfig{} // Not a pointer + err := MapToStruct(data, config) + + if err == nil { + t.Errorf("Expected error for non-pointer input, but got none") + } +} + +// Benchmark tests to ensure performance is reasonable +func BenchmarkStructToMap(b *testing.B) { + config := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 3, + }, + TaskSpecificField: 0.25, + AnotherSpecificField: "benchmark_test", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = StructToMap(config) + } +} + +func BenchmarkMapToStruct(b *testing.B) { + data := map[string]interface{}{ + "enabled": true, + "scan_interval_seconds": 1800, + "max_concurrent": 3, + "task_specific_field": 0.25, + "another_specific_field": "benchmark_test", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + config := &TestTaskConfig{} + _ = MapToStruct(data, config) + } +} + +func BenchmarkRoundTrip(b *testing.B) { + original := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 3, + }, + TaskSpecificField: 0.25, + AnotherSpecificField: "benchmark_test", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + dataMap := StructToMap(original) + roundTrip := &TestTaskConfig{} + _ = MapToStruct(dataMap, roundTrip) + } +} diff --git a/weed/worker/tasks/base/typed_task.go b/weed/worker/tasks/base/typed_task.go new file mode 100644 index 000000000..9d2839607 --- /dev/null +++ b/weed/worker/tasks/base/typed_task.go @@ -0,0 +1,218 @@ +package base + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// BaseTypedTask provides a base implementation for typed tasks with logger support +type BaseTypedTask struct { + taskType types.TaskType + taskID string + progress float64 + progressCallback func(float64) + cancelled bool + mutex sync.RWMutex + + // Logger functionality + logger tasks.TaskLogger + loggerConfig types.TaskLoggerConfig +} + +// NewBaseTypedTask creates a new base typed task +func NewBaseTypedTask(taskType types.TaskType) *BaseTypedTask { + return &BaseTypedTask{ + taskType: taskType, + progress: 0.0, + loggerConfig: types.TaskLoggerConfig{ + BaseLogDir: "/data/task_logs", + MaxTasks: 100, + MaxLogSizeMB: 10, + EnableConsole: true, + }, + } +} + +// GetType returns the task type +func (bt *BaseTypedTask) GetType() types.TaskType { + return bt.taskType +} + +// IsCancellable returns whether the task can be cancelled +func (bt *BaseTypedTask) IsCancellable() bool { + return true // Most tasks can be cancelled +} + +// Cancel cancels the task +func (bt *BaseTypedTask) Cancel() error { + bt.mutex.Lock() + defer bt.mutex.Unlock() + bt.cancelled = true + return nil +} + +// IsCancelled returns whether the task has been cancelled +func (bt *BaseTypedTask) IsCancelled() bool { + bt.mutex.RLock() + defer bt.mutex.RUnlock() + return bt.cancelled +} + +// GetProgress returns the current progress (0-100) +func (bt *BaseTypedTask) GetProgress() float64 { + bt.mutex.RLock() + defer bt.mutex.RUnlock() + return bt.progress +} + +// SetProgress sets the current progress and calls the callback if set +func (bt *BaseTypedTask) SetProgress(progress float64) { + bt.mutex.Lock() + callback := bt.progressCallback + bt.progress = progress + bt.mutex.Unlock() + + if callback != nil { + callback(progress) + } +} + +// SetProgressCallback sets the progress callback function +func (bt *BaseTypedTask) SetProgressCallback(callback func(float64)) { + bt.mutex.Lock() + defer bt.mutex.Unlock() + bt.progressCallback = callback +} + +// SetLoggerConfig sets the logger configuration for this task +func (bt *BaseTypedTask) SetLoggerConfig(config types.TaskLoggerConfig) { + bt.mutex.Lock() + defer bt.mutex.Unlock() + bt.loggerConfig = config +} + +// convertToTasksLoggerConfig converts types.TaskLoggerConfig to tasks.TaskLoggerConfig +func convertToTasksLoggerConfig(config types.TaskLoggerConfig) tasks.TaskLoggerConfig { + return tasks.TaskLoggerConfig{ + BaseLogDir: config.BaseLogDir, + MaxTasks: config.MaxTasks, + MaxLogSizeMB: config.MaxLogSizeMB, + EnableConsole: config.EnableConsole, + } +} + +// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface) +func (bt *BaseTypedTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error { + bt.mutex.Lock() + defer bt.mutex.Unlock() + + bt.taskID = taskID + + // Convert the logger config to the tasks package type + tasksLoggerConfig := convertToTasksLoggerConfig(bt.loggerConfig) + + logger, err := tasks.NewTaskLogger(taskID, bt.taskType, workerID, params, tasksLoggerConfig) + if err != nil { + return fmt.Errorf("failed to initialize task logger: %w", err) + } + + bt.logger = logger + if bt.logger != nil { + bt.logger.Info("BaseTypedTask initialized for task %s (type: %s)", taskID, bt.taskType) + } + + return nil +} + +// GetTaskLogger returns the task logger (LoggerProvider interface) +func (bt *BaseTypedTask) GetTaskLogger() types.TaskLogger { + bt.mutex.RLock() + defer bt.mutex.RUnlock() + return bt.logger +} + +// LogInfo logs an info message +func (bt *BaseTypedTask) LogInfo(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Info(message, args...) + } +} + +// LogWarning logs a warning message +func (bt *BaseTypedTask) LogWarning(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Warning(message, args...) + } +} + +// LogError logs an error message +func (bt *BaseTypedTask) LogError(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Error(message, args...) + } +} + +// LogDebug logs a debug message +func (bt *BaseTypedTask) LogDebug(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Debug(message, args...) + } +} + +// LogWithFields logs a message with structured fields +func (bt *BaseTypedTask) LogWithFields(level string, message string, fields map[string]interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.LogWithFields(level, message, fields) + } +} + +// ValidateTyped provides basic validation for typed parameters +func (bt *BaseTypedTask) ValidateTyped(params *worker_pb.TaskParams) error { + if params == nil { + return errors.New("task parameters cannot be nil") + } + if params.VolumeId == 0 { + return errors.New("volume_id is required") + } + if params.Server == "" { + return errors.New("server is required") + } + return nil +} + +// EstimateTimeTyped provides a default time estimation +func (bt *BaseTypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration { + // Default estimation - concrete tasks should override this + return 5 * time.Minute +} + +// ExecuteTyped is a placeholder that concrete tasks must implement +func (bt *BaseTypedTask) ExecuteTyped(params *worker_pb.TaskParams) error { + panic("ExecuteTyped must be implemented by concrete task types") +} diff --git a/weed/worker/tasks/config_update_registry.go b/weed/worker/tasks/config_update_registry.go new file mode 100644 index 000000000..649c8b384 --- /dev/null +++ b/weed/worker/tasks/config_update_registry.go @@ -0,0 +1,67 @@ +package tasks + +import ( + "sync" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// ConfigUpdateFunc is a function type for updating task configurations +type ConfigUpdateFunc func(configPersistence interface{}) error + +// ConfigUpdateRegistry manages config update functions for all task types +type ConfigUpdateRegistry struct { + updaters map[types.TaskType]ConfigUpdateFunc + mutex sync.RWMutex +} + +var ( + globalConfigUpdateRegistry *ConfigUpdateRegistry + configUpdateRegistryOnce sync.Once +) + +// GetGlobalConfigUpdateRegistry returns the global config update registry (singleton) +func GetGlobalConfigUpdateRegistry() *ConfigUpdateRegistry { + configUpdateRegistryOnce.Do(func() { + globalConfigUpdateRegistry = &ConfigUpdateRegistry{ + updaters: make(map[types.TaskType]ConfigUpdateFunc), + } + glog.V(1).Infof("Created global config update registry") + }) + return globalConfigUpdateRegistry +} + +// RegisterConfigUpdater registers a config update function for a task type +func (r *ConfigUpdateRegistry) RegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.updaters[taskType] = updateFunc + glog.V(1).Infof("Registered config updater for task type: %s", taskType) +} + +// UpdateAllConfigs updates configurations for all registered task types +func (r *ConfigUpdateRegistry) UpdateAllConfigs(configPersistence interface{}) { + r.mutex.RLock() + updaters := make(map[types.TaskType]ConfigUpdateFunc) + for k, v := range r.updaters { + updaters[k] = v + } + r.mutex.RUnlock() + + for taskType, updateFunc := range updaters { + if err := updateFunc(configPersistence); err != nil { + glog.Warningf("Failed to load %s configuration from persistence: %v", taskType, err) + } else { + glog.V(1).Infof("Loaded %s configuration from persistence", taskType) + } + } + + glog.V(1).Infof("All task configurations loaded from persistence") +} + +// AutoRegisterConfigUpdater is a convenience function for registering config updaters +func AutoRegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) { + registry := GetGlobalConfigUpdateRegistry() + registry.RegisterConfigUpdater(taskType, updateFunc) +} diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go new file mode 100644 index 000000000..1f70fb8db --- /dev/null +++ b/weed/worker/tasks/erasure_coding/config.go @@ -0,0 +1,207 @@ +package erasure_coding + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with erasure coding specific settings +type Config struct { + base.BaseConfig + QuietForSeconds int `json:"quiet_for_seconds"` + FullnessRatio float64 `json:"fullness_ratio"` + CollectionFilter string `json:"collection_filter"` + MinSizeMB int `json:"min_size_mb"` +} + +// NewDefaultConfig creates a new default erasure coding configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 60 * 60, // 1 hour + MaxConcurrent: 1, + }, + QuietForSeconds: 300, // 5 minutes + FullnessRatio: 0.8, // 80% + CollectionFilter: "", + MinSizeMB: 30, // 30MB (more reasonable than 100MB) + } +} + +// GetConfigSpec returns the configuration schema for erasure coding tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Erasure Coding Tasks", + Description: "Whether erasure coding tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic erasure coding task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing erasure coding", + HelpText: "The system will check for volumes that need erasure coding at this interval", + Placeholder: "1", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 5, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of erasure coding tasks that can run simultaneously", + HelpText: "Limits the number of erasure coding operations running at the same time", + Placeholder: "1 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "quiet_for_seconds", + JSONName: "quiet_for_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 300, + MinValue: 60, + MaxValue: 3600, + Required: true, + DisplayName: "Quiet Period", + Description: "Minimum time volume must be quiet before erasure coding", + HelpText: "Volume must not be modified for this duration before erasure coding", + Placeholder: "5", + Unit: config.UnitMinutes, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "fullness_ratio", + JSONName: "fullness_ratio", + Type: config.FieldTypeFloat, + DefaultValue: 0.8, + MinValue: 0.1, + MaxValue: 1.0, + Required: true, + DisplayName: "Fullness Ratio", + Description: "Minimum fullness ratio to trigger erasure coding", + HelpText: "Only volumes with this fullness ratio or higher will be erasure coded", + Placeholder: "0.80 (80%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "collection_filter", + JSONName: "collection_filter", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Collection Filter", + Description: "Only process volumes from specific collections", + HelpText: "Leave empty to process all collections, or specify collection name", + Placeholder: "my_collection", + InputType: "text", + CSSClasses: "form-control", + }, + { + Name: "min_size_mb", + JSONName: "min_size_mb", + Type: config.FieldTypeInt, + DefaultValue: 30, + MinValue: 1, + MaxValue: 1000, + Required: true, + DisplayName: "Minimum Size (MB)", + Description: "Minimum volume size to consider for erasure coding", + HelpText: "Only volumes larger than this size will be considered for erasure coding", + Placeholder: "30", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} + +// ToTaskPolicy converts configuration to a TaskPolicy protobuf message +func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: float64(c.FullnessRatio), + QuietForSeconds: int32(c.QuietForSeconds), + MinVolumeSizeMb: int32(c.MinSizeMB), + CollectionFilter: c.CollectionFilter, + }, + }, + } +} + +// FromTaskPolicy loads configuration from a TaskPolicy protobuf message +func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + + // Set general TaskPolicy fields + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping + + // Set erasure coding-specific fields from the task config + if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil { + c.FullnessRatio = float64(ecConfig.FullnessRatio) + c.QuietForSeconds = int(ecConfig.QuietForSeconds) + c.MinSizeMB = int(ecConfig.MinVolumeSizeMb) + c.CollectionFilter = ecConfig.CollectionFilter + } + + return nil +} + +// LoadConfigFromPersistence loads configuration from the persistence layer if available +func LoadConfigFromPersistence(configPersistence interface{}) *Config { + config := NewDefaultConfig() + + // Try to load from persistence if available + if persistence, ok := configPersistence.(interface { + LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error) + }); ok { + if policy, err := persistence.LoadErasureCodingTaskPolicy(); err == nil && policy != nil { + if err := config.FromTaskPolicy(policy); err == nil { + glog.V(1).Infof("Loaded erasure coding configuration from persistence") + return config + } + } + } + + glog.V(1).Infof("Using default erasure coding configuration") + return config +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go new file mode 100644 index 000000000..1a2558396 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -0,0 +1,140 @@ +package erasure_coding + +import ( + "fmt" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection implements the detection logic for erasure coding tasks +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + ecConfig := config.(*Config) + var results []*types.TaskDetectionResult + now := time.Now() + quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second + minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum + + debugCount := 0 + skippedAlreadyEC := 0 + skippedTooSmall := 0 + skippedCollectionFilter := 0 + skippedQuietTime := 0 + skippedFullness := 0 + + for _, metric := range metrics { + // Skip if already EC volume + if metric.IsECVolume { + skippedAlreadyEC++ + continue + } + + // Check minimum size requirement + if metric.Size < minSizeBytes { + skippedTooSmall++ + continue + } + + // Check collection filter if specified + if ecConfig.CollectionFilter != "" { + // Parse comma-separated collections + allowedCollections := make(map[string]bool) + for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { + allowedCollections[strings.TrimSpace(collection)] = true + } + // Skip if volume's collection is not in the allowed list + if !allowedCollections[metric.Collection] { + skippedCollectionFilter++ + continue + } + } + + // Check quiet duration and fullness criteria + if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeErasureCoding, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: types.TaskPriorityLow, // EC is not urgent + Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)", + metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100, + float64(metric.Size)/(1024*1024)), + ScheduleAt: now, + } + results = append(results, result) + } else { + // Count debug reasons + if debugCount < 5 { // Limit to avoid spam + if metric.Age < quietThreshold { + skippedQuietTime++ + } + if metric.FullnessRatio < ecConfig.FullnessRatio { + skippedFullness++ + } + } + debugCount++ + } + } + + // Log debug summary if no tasks were created + if len(results) == 0 && len(metrics) > 0 { + totalVolumes := len(metrics) + glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", + totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness) + + // Show details for first few volumes + for i, metric := range metrics { + if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes + continue + } + sizeMB := float64(metric.Size) / (1024 * 1024) + glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need β₯%dMB), age=%s (need β₯%s), fullness=%.1f%% (need β₯%.1f%%)", + metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute), + metric.FullnessRatio*100, ecConfig.FullnessRatio*100) + } + } + + return results, nil +} + +// Scheduling implements the scheduling logic for erasure coding tasks +func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + ecConfig := config.(*Config) + + // Check if we have available workers + if len(availableWorkers) == 0 { + return false + } + + // Count running EC tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeErasureCoding { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecConfig.MaxConcurrent { + return false + } + + // Check if any worker can handle EC tasks + for _, worker := range availableWorkers { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeErasureCoding { + return true + } + } + } + + return false +} diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go index 641dfc6b5..8dc7a1cd0 100644 --- a/weed/worker/tasks/erasure_coding/ec.go +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -1,79 +1,785 @@ package erasure_coding import ( + "context" "fmt" + "io" + "math" + "os" + "path/filepath" + "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) -// Task implements erasure coding operation to convert volumes to EC format +// Task implements comprehensive erasure coding with protobuf parameters type Task struct { - *tasks.BaseTask - server string - volumeID uint32 + *base.BaseTypedTask + + // Current task state + sourceServer string + volumeID uint32 + collection string + workDir string + masterClient string + grpcDialOpt grpc.DialOption + + // EC parameters from protobuf + destinations []*worker_pb.ECDestination // Disk-aware destinations + existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup + estimatedShardSize uint64 + dataShards int + parityShards int + cleanupSource bool + + // Progress tracking + currentStep string + stepProgress map[string]float64 } -// NewTask creates a new erasure coding task instance -func NewTask(server string, volumeID uint32) *Task { +// NewTask creates a new erasure coding task +func NewTask() types.TypedTaskInterface { task := &Task{ - BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), - server: server, - volumeID: volumeID, + BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding), + masterClient: "localhost:9333", // Default master client + workDir: "/tmp/seaweedfs_ec_work", // Default work directory + grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure + dataShards: erasure_coding.DataShardsCount, // Use package constant + parityShards: erasure_coding.ParityShardsCount, // Use package constant + stepProgress: make(map[string]float64), } return task } -// Execute executes the erasure coding task -func (t *Task) Execute(params types.TaskParams) error { - glog.Infof("Starting erasure coding task for volume %d on server %s", t.volumeID, t.server) +// ValidateTyped validates the typed parameters for EC task +func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error { + // Basic validation from base class + if err := t.BaseTypedTask.ValidateTyped(params); err != nil { + return err + } + + // Check that we have EC-specific parameters + ecParams := params.GetErasureCodingParams() + if ecParams == nil { + return fmt.Errorf("erasure_coding_params is required for EC task") + } + + // Require destinations + if len(ecParams.Destinations) == 0 { + return fmt.Errorf("destinations must be specified for EC task") + } + + // DataShards and ParityShards are constants from erasure_coding package + expectedDataShards := int32(erasure_coding.DataShardsCount) + expectedParityShards := int32(erasure_coding.ParityShardsCount) + + if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards { + return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards) + } + if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards { + return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards) + } + + // Validate destination count + destinationCount := len(ecParams.Destinations) + totalShards := expectedDataShards + expectedParityShards + if totalShards > int32(destinationCount) { + return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount) + } + + return nil +} + +// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters +func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration { + baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations + + ecParams := params.GetErasureCodingParams() + if ecParams != nil && ecParams.EstimatedShardSize > 0 { + // More accurate estimate based on shard size + // Account for copying, encoding, and distribution + gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024) + estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB + if estimatedTime > baseTime { + return estimatedTime + } + } + + return baseTime +} + +// ExecuteTyped implements the actual erasure coding workflow with typed parameters +func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error { + // Extract basic parameters + t.volumeID = params.VolumeId + t.sourceServer = params.Server + t.collection = params.Collection - // Simulate erasure coding operation with progress updates - steps := []struct { - name string - duration time.Duration - progress float64 - }{ - {"Analyzing volume", 2 * time.Second, 15}, - {"Creating EC shards", 5 * time.Second, 50}, - {"Verifying shards", 2 * time.Second, 75}, - {"Finalizing EC volume", 1 * time.Second, 100}, + // Extract EC-specific parameters + ecParams := params.GetErasureCodingParams() + if ecParams != nil { + t.destinations = ecParams.Destinations // Store disk-aware destinations + t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup + t.estimatedShardSize = ecParams.EstimatedShardSize + t.cleanupSource = ecParams.CleanupSource + + // DataShards and ParityShards are constants, don't override from parameters + // t.dataShards and t.parityShards are already set to constants in NewTask + + if ecParams.WorkingDir != "" { + t.workDir = ecParams.WorkingDir + } + if ecParams.MasterClient != "" { + t.masterClient = ecParams.MasterClient + } } - for _, step := range steps { - if t.IsCancelled() { - return fmt.Errorf("erasure coding task cancelled") + // Determine available destinations for logging + var availableDestinations []string + for _, dest := range t.destinations { + availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId)) + } + + glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)", + t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards) + + // Create unique working directory for this task + taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix())) + if err := os.MkdirAll(taskWorkDir, 0755); err != nil { + return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err) + } + glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir) + + // Ensure cleanup of working directory + defer func() { + if err := os.RemoveAll(taskWorkDir); err != nil { + glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err) + } else { + glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir) } + }() + + // Step 1: Collect volume locations from master + glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master") + t.SetProgress(5.0) + volumeId := needle.VolumeId(t.volumeID) + volumeLocations, err := t.collectVolumeLocations(volumeId) + if err != nil { + return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err) + } + glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations) - glog.V(1).Infof("Erasure coding task step: %s", step.name) - t.SetProgress(step.progress) + // Convert ServerAddress slice to string slice + var locationStrings []string + for _, addr := range volumeLocations { + locationStrings = append(locationStrings, string(addr)) + } - // Simulate work - time.Sleep(step.duration) + // Step 2: Check if volume has sufficient size for EC encoding + if !t.shouldPerformECEncoding(locationStrings) { + glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID) + t.SetProgress(100.0) + return nil } - glog.Infof("Erasure coding task completed for volume %d on server %s", t.volumeID, t.server) + // Step 2A: Cleanup existing EC shards if any + glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID) + t.SetProgress(10.0) + err = t.cleanupExistingEcShards() + if err != nil { + glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err) + // Don't fail the task - this is just cleanup + } + glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID) + + // Step 3: Mark volume readonly on all servers + glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID) + t.SetProgress(15.0) + err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings) + if err != nil { + return fmt.Errorf("failed to mark volume readonly: %v", err) + } + glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID) + + // Step 5: Copy volume files (.dat, .idx) to EC worker + glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer) + t.SetProgress(25.0) + localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir) + if err != nil { + return fmt.Errorf("failed to copy volume files to EC worker: %v", err) + } + glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles) + + // Step 6: Generate EC shards locally on EC worker + glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker") + t.SetProgress(40.0) + localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir) + if err != nil { + return fmt.Errorf("failed to generate EC shards locally: %v", err) + } + glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles)) + + // Step 7: Distribute shards from EC worker to destination servers + glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers") + t.SetProgress(60.0) + err = t.distributeEcShardsFromWorker(localShardFiles) + if err != nil { + return fmt.Errorf("failed to distribute EC shards from worker: %v", err) + } + glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers") + + // Step 8: Mount EC shards on destination servers + glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers") + t.SetProgress(80.0) + err = t.mountEcShardsOnDestinations() + if err != nil { + return fmt.Errorf("failed to mount EC shards: %v", err) + } + glog.V(1).Infof("WORKFLOW: EC shards mounted successfully") + + // Step 9: Delete original volume from all locations + glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID) + t.SetProgress(90.0) + err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings) + if err != nil { + return fmt.Errorf("failed to delete original volume: %v", err) + } + glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID) + + t.SetProgress(100.0) + glog.Infof("EC task completed successfully for volume %d", t.volumeID) return nil } -// Validate validates the task parameters -func (t *Task) Validate(params types.TaskParams) error { - if params.VolumeID == 0 { - return fmt.Errorf("volume_id is required") +// collectVolumeLocations gets volume location from master (placeholder implementation) +func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) { + // For now, return a placeholder implementation + // Full implementation would call master to get volume locations + return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil +} + +// cleanupExistingEcShards deletes existing EC shards using planned locations +func (t *Task) cleanupExistingEcShards() error { + if len(t.existingShardLocations) == 0 { + glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID) + return nil } - if params.Server == "" { - return fmt.Errorf("server is required") + + glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations)) + + // Delete existing shards from each location using planned shard locations + for _, location := range t.existingShardLocations { + if len(location.ShardIds) == 0 { + continue + } + + glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: location.ShardIds, + }) + return deleteErr + }) + + if err != nil { + glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err) + // Continue with other servers - don't fail the entire cleanup + } else { + glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID) + } } + + glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID) return nil } -// EstimateTime estimates the time needed for the task -func (t *Task) EstimateTime(params types.TaskParams) time.Duration { - // Base time for erasure coding operation - baseTime := 30 * time.Second +// shouldPerformECEncoding checks if the volume meets criteria for EC encoding +func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool { + // For now, always proceed with EC encoding if volume exists + // This can be extended with volume size checks, etc. + return len(volumeLocations) > 0 +} - // Could adjust based on volume size or other factors - return baseTime +// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers +func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error { + glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations)) + + // Mark volume readonly on all replica servers + for _, location := range volumeLocations { + glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: uint32(volumeId), + }) + return markErr + }) + + if err != nil { + glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err) + return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err) + } + + glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location) + } + + glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations)) + return nil +} + +// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker +func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) { + localFiles := make(map[string]string) + + // Copy .dat file + datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID))) + err := t.copyFileFromSource(".dat", datFile) + if err != nil { + return nil, fmt.Errorf("failed to copy .dat file: %v", err) + } + localFiles["dat"] = datFile + glog.V(1).Infof("Copied .dat file to: %s", datFile) + + // Copy .idx file + idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID))) + err = t.copyFileFromSource(".idx", idxFile) + if err != nil { + return nil, fmt.Errorf("failed to copy .idx file: %v", err) + } + localFiles["idx"] = idxFile + glog.V(1).Infof("Copied .idx file to: %s", idxFile) + + return localFiles, nil +} + +// copyFileFromSource copies a file from source server to local path using gRPC streaming +func (t *Task) copyFileFromSource(ext, localPath string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + Ext: ext, + StopOffset: uint64(math.MaxInt64), + }) + if err != nil { + return fmt.Errorf("failed to initiate file copy: %v", err) + } + + // Create local file + localFile, err := os.Create(localPath) + if err != nil { + return fmt.Errorf("failed to create local file %s: %v", localPath, err) + } + defer localFile.Close() + + // Stream data and write to local file + totalBytes := int64(0) + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to receive file data: %v", err) + } + + if len(resp.FileContent) > 0 { + written, writeErr := localFile.Write(resp.FileContent) + if writeErr != nil { + return fmt.Errorf("failed to write to local file: %v", writeErr) + } + totalBytes += int64(written) + } + } + + glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath) + return nil + }) +} + +// generateEcShardsLocally generates EC shards from local volume files +func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) { + datFile := localFiles["dat"] + idxFile := localFiles["idx"] + + if datFile == "" || idxFile == "" { + return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile) + } + + // Get base name without extension for EC operations + baseName := strings.TrimSuffix(datFile, ".dat") + + shardFiles := make(map[string]string) + + glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile) + + // Generate EC shard files (.ec00 ~ .ec13) + if err := erasure_coding.WriteEcFiles(baseName); err != nil { + return nil, fmt.Errorf("failed to generate EC shard files: %v", err) + } + + // Generate .ecx file from .idx + if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil { + return nil, fmt.Errorf("failed to generate .ecx file: %v", err) + } + + // Collect generated shard file paths + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardFile := fmt.Sprintf("%s.ec%02d", baseName, i) + if _, err := os.Stat(shardFile); err == nil { + shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile + } + } + + // Add metadata files + ecxFile := idxFile + ".ecx" + if _, err := os.Stat(ecxFile); err == nil { + shardFiles["ecx"] = ecxFile + } + + // Generate .vif file (volume info) + vifFile := baseName + ".vif" + // Create basic volume info - in a real implementation, this would come from the original volume + volumeInfo := &volume_server_pb.VolumeInfo{ + Version: uint32(needle.GetCurrentVersion()), + } + if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil { + glog.Warningf("Failed to create .vif file: %v", err) + } else { + shardFiles["vif"] = vifFile + } + + glog.V(1).Infof("Generated %d EC files locally", len(shardFiles)) + return shardFiles, nil +} + +func (t *Task) copyEcShardsToDestinations() error { + if len(t.destinations) == 0 { + return fmt.Errorf("no destinations specified for EC shard distribution") + } + + destinations := t.destinations + + glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations)) + + // Prepare shard IDs (0-13 for EC shards) + var shardIds []uint32 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardIds = append(shardIds, uint32(i)) + } + + // Distribute shards across destinations + var wg sync.WaitGroup + errorChan := make(chan error, len(destinations)) + + // Track which disks have already received metadata files (server+disk) + metadataFilesCopied := make(map[string]bool) + var metadataMutex sync.Mutex + + // For each destination, copy a subset of shards + shardsPerDest := len(shardIds) / len(destinations) + remainder := len(shardIds) % len(destinations) + + shardOffset := 0 + for i, dest := range destinations { + wg.Add(1) + + shardsForThisDest := shardsPerDest + if i < remainder { + shardsForThisDest++ // Distribute remainder shards + } + + destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest] + shardOffset += shardsForThisDest + + go func(destination *worker_pb.ECDestination, targetShardIds []uint32) { + defer wg.Done() + + if t.IsCancelled() { + errorChan <- fmt.Errorf("task cancelled during shard copy") + return + } + + // Create disk-specific metadata key (server+disk) + diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId) + + glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)", + targetShardIds, t.sourceServer, destination.Node, destination.DiskId) + + // Check if this disk needs metadata files (only once per disk) + metadataMutex.Lock() + needsMetadataFiles := !metadataFilesCopied[diskKey] + if needsMetadataFiles { + metadataFilesCopied[diskKey] = true + } + metadataMutex.Unlock() + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(t.volumeID), + Collection: t.collection, + ShardIds: targetShardIds, + CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk + CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk + CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk + SourceDataNode: t.sourceServer, + DiskId: destination.DiskId, // Pass target disk ID + }) + return copyErr + }) + + if err != nil { + errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err) + return + } + + if needsMetadataFiles { + glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d", + targetShardIds, destination.Node, destination.DiskId) + } else { + glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)", + targetShardIds, destination.Node, destination.DiskId) + } + }(dest, destShardIds) + } + + wg.Wait() + close(errorChan) + + // Check for any copy errors + if err := <-errorChan; err != nil { + return err + } + + glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID) + return nil +} + +// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers +func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error { + if len(t.destinations) == 0 { + return fmt.Errorf("no destinations specified for EC shard distribution") + } + + destinations := t.destinations + + glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations)) + + // Prepare shard IDs (0-13 for EC shards) + var shardIds []uint32 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + shardIds = append(shardIds, uint32(i)) + } + + // Distribute shards across destinations + var wg sync.WaitGroup + errorChan := make(chan error, len(destinations)) + + // Track which disks have already received metadata files (server+disk) + metadataFilesCopied := make(map[string]bool) + var metadataMutex sync.Mutex + + // For each destination, send a subset of shards + shardsPerDest := len(shardIds) / len(destinations) + remainder := len(shardIds) % len(destinations) + + shardOffset := 0 + for i, dest := range destinations { + wg.Add(1) + + shardsForThisDest := shardsPerDest + if i < remainder { + shardsForThisDest++ // Distribute remainder shards + } + + destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest] + shardOffset += shardsForThisDest + + go func(destination *worker_pb.ECDestination, targetShardIds []uint32) { + defer wg.Done() + + if t.IsCancelled() { + errorChan <- fmt.Errorf("task cancelled during shard distribution") + return + } + + // Create disk-specific metadata key (server+disk) + diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId) + + glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)", + targetShardIds, destination.Node, destination.DiskId) + + // Check if this disk needs metadata files (only once per disk) + metadataMutex.Lock() + needsMetadataFiles := !metadataFilesCopied[diskKey] + if needsMetadataFiles { + metadataFilesCopied[diskKey] = true + } + metadataMutex.Unlock() + + // Send shard files to destination using HTTP upload (simplified for now) + err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles) + if err != nil { + errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err) + return + } + + if needsMetadataFiles { + glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d", + targetShardIds, destination.Node, destination.DiskId) + } else { + glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)", + targetShardIds, destination.Node, destination.DiskId) + } + }(dest, destShardIds) + } + + wg.Wait() + close(errorChan) + + // Check for any distribution errors + if err := <-errorChan; err != nil { + return err + } + + glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID) + return nil +} + +// sendShardsToDestination sends specific shard files from worker to a destination server (simplified) +func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error { + // For now, use a simplified approach - just upload the files + // In a full implementation, this would use proper file upload mechanisms + glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId) + + // TODO: Implement actual file upload to volume server + // This is a placeholder - actual implementation would: + // 1. Open each shard file locally + // 2. Upload via HTTP POST or gRPC stream to destination volume server + // 3. Volume server would save to the specified disk_id + + return nil +} + +// mountEcShardsOnDestinations mounts EC shards on all destination servers +func (t *Task) mountEcShardsOnDestinations() error { + if len(t.destinations) == 0 { + return fmt.Errorf("no destinations specified for mounting EC shards") + } + + destinations := t.destinations + + glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations)) + + // Prepare all shard IDs (0-13) + var allShardIds []uint32 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + allShardIds = append(allShardIds, uint32(i)) + } + + var wg sync.WaitGroup + errorChan := make(chan error, len(destinations)) + + // Mount shards on each destination server + for _, dest := range destinations { + wg.Add(1) + + go func(destination *worker_pb.ECDestination) { + defer wg.Done() + + if t.IsCancelled() { + errorChan <- fmt.Errorf("task cancelled during shard mounting") + return + } + + glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(t.volumeID), + Collection: t.collection, + ShardIds: allShardIds, // Mount all available shards on each server + }) + return mountErr + }) + + if err != nil { + // It's normal for some servers to not have all shards, so log as warning rather than error + glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err) + } else { + glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId) + } + }(dest) + } + + wg.Wait() + close(errorChan) + + // Check for any critical mounting errors + select { + case err := <-errorChan: + if err != nil { + glog.Warningf("Some shard mounting issues occurred: %v", err) + } + default: + // No errors + } + + glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID) + return nil +} + +// deleteVolumeFromAllLocations deletes the original volume from all replica servers +func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error { + glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations)) + + for _, location := range volumeLocations { + glog.V(1).Infof("Deleting volume %d from %s", volumeId, location) + + err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt, + func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + VolumeId: uint32(volumeId), + OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards + }) + return deleteErr + }) + + if err != nil { + glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err) + return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err) + } + + glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location) + } + + glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations)) + return nil +} + +// Register the task in the global registry +func init() { + types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask) + glog.V(1).Infof("Registered EC task") } diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go deleted file mode 100644 index 0f8b5e376..000000000 --- a/weed/worker/tasks/erasure_coding/ec_detector.go +++ /dev/null @@ -1,139 +0,0 @@ -package erasure_coding - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// EcDetector implements erasure coding task detection -type EcDetector struct { - enabled bool - volumeAgeHours int - fullnessRatio float64 - scanInterval time.Duration -} - -// Compile-time interface assertions -var ( - _ types.TaskDetector = (*EcDetector)(nil) -) - -// NewEcDetector creates a new erasure coding detector -func NewEcDetector() *EcDetector { - return &EcDetector{ - enabled: false, // Conservative default - volumeAgeHours: 24 * 7, // 1 week - fullnessRatio: 0.9, // 90% full - scanInterval: 2 * time.Hour, - } -} - -// GetTaskType returns the task type -func (d *EcDetector) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// ScanForTasks scans for volumes that should be converted to erasure coding -func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { - if !d.enabled { - return nil, nil - } - - var results []*types.TaskDetectionResult - now := time.Now() - ageThreshold := time.Duration(d.volumeAgeHours) * time.Hour - - for _, metric := range volumeMetrics { - // Skip if already EC volume - if metric.IsECVolume { - continue - } - - // Check age and fullness criteria - if metric.Age >= ageThreshold && metric.FullnessRatio >= d.fullnessRatio { - // Check if volume is read-only (safe for EC conversion) - if !metric.IsReadOnly { - continue - } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeErasureCoding, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: types.TaskPriorityLow, // EC is not urgent - Reason: "Volume is old and full enough for EC conversion", - Parameters: map[string]interface{}{ - "age_hours": int(metric.Age.Hours()), - "fullness_ratio": metric.FullnessRatio, - }, - ScheduleAt: now, - } - results = append(results, result) - } - } - - glog.V(2).Infof("EC detector found %d tasks to schedule", len(results)) - return results, nil -} - -// ScanInterval returns how often this task type should be scanned -func (d *EcDetector) ScanInterval() time.Duration { - return d.scanInterval -} - -// IsEnabled returns whether this task type is enabled -func (d *EcDetector) IsEnabled() bool { - return d.enabled -} - -// Configuration setters - -func (d *EcDetector) SetEnabled(enabled bool) { - d.enabled = enabled -} - -func (d *EcDetector) SetVolumeAgeHours(hours int) { - d.volumeAgeHours = hours -} - -func (d *EcDetector) SetFullnessRatio(ratio float64) { - d.fullnessRatio = ratio -} - -func (d *EcDetector) SetScanInterval(interval time.Duration) { - d.scanInterval = interval -} - -// GetVolumeAgeHours returns the current volume age threshold in hours -func (d *EcDetector) GetVolumeAgeHours() int { - return d.volumeAgeHours -} - -// GetFullnessRatio returns the current fullness ratio threshold -func (d *EcDetector) GetFullnessRatio() float64 { - return d.fullnessRatio -} - -// GetScanInterval returns the scan interval -func (d *EcDetector) GetScanInterval() time.Duration { - return d.scanInterval -} - -// ConfigureFromPolicy configures the detector based on the maintenance policy -func (d *EcDetector) ConfigureFromPolicy(policy interface{}) { - // Type assert to the maintenance policy type we expect - if maintenancePolicy, ok := policy.(interface { - GetECEnabled() bool - GetECVolumeAgeHours() int - GetECFullnessRatio() float64 - }); ok { - d.SetEnabled(maintenancePolicy.GetECEnabled()) - d.SetVolumeAgeHours(maintenancePolicy.GetECVolumeAgeHours()) - d.SetFullnessRatio(maintenancePolicy.GetECFullnessRatio()) - } else { - glog.V(1).Infof("Could not configure EC detector from policy: unsupported policy type") - } -} diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go index 6c4b5bf7f..62cfe6b56 100644 --- a/weed/worker/tasks/erasure_coding/ec_register.go +++ b/weed/worker/tasks/erasure_coding/ec_register.go @@ -2,80 +2,71 @@ package erasure_coding import ( "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Factory creates erasure coding task instances -type Factory struct { - *tasks.BaseTaskFactory -} +// Global variable to hold the task definition for configuration updates +var globalTaskDef *base.TaskDefinition -// NewFactory creates a new erasure coding task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeErasureCoding, - []string{"erasure_coding", "storage", "durability"}, - "Convert volumes to erasure coded format for improved durability", - ), - } -} - -// Create creates a new erasure coding task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID) - task.SetEstimatedDuration(task.EstimateTime(params)) +// Auto-register this task when the package is imported +func init() { + RegisterErasureCodingTask() - return task, nil + // Register config updater + tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence) } -// Shared detector and scheduler instances -var ( - sharedDetector *EcDetector - sharedScheduler *Scheduler -) +// RegisterErasureCodingTask registers the erasure coding task with the new architecture +func RegisterErasureCodingTask() { + // Create configuration instance + config := NewDefaultConfig() -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*EcDetector, *Scheduler) { - if sharedDetector == nil { - sharedDetector = NewEcDetector() - } - if sharedScheduler == nil { - sharedScheduler = NewScheduler() + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeErasureCoding, + Name: "erasure_coding", + DisplayName: "Erasure Coding", + Description: "Applies erasure coding to volumes for data protection", + Icon: "fas fa-shield-alt text-success", + Capabilities: []string{"erasure_coding", "data_protection"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: nil, // Uses typed task system - see init() in ec.go + DetectionFunc: Detection, + ScanInterval: 1 * time.Hour, + SchedulingFunc: Scheduling, + MaxConcurrent: 1, + RepeatInterval: 24 * time.Hour, } - return sharedDetector, sharedScheduler -} -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*EcDetector, *Scheduler) { - return getSharedInstances() + // Store task definition globally for configuration updates + globalTaskDef = taskDef + + // Register everything with a single function call! + base.RegisterTask(taskDef) } -// Auto-register this task when the package is imported -func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeErasureCoding, factory) +// UpdateConfigFromPersistence updates the erasure coding configuration from persistence +func UpdateConfigFromPersistence(configPersistence interface{}) error { + if globalTaskDef == nil { + return fmt.Errorf("erasure coding task not registered") + } - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() + // Load configuration from persistence + newConfig := LoadConfigFromPersistence(configPersistence) + if newConfig == nil { + return fmt.Errorf("failed to load configuration from persistence") + } - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) + // Update the task definition's config + globalTaskDef.Config = newConfig - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + glog.V(1).Infof("Updated erasure coding task configuration from persistence") + return nil } diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go deleted file mode 100644 index b2366bb06..000000000 --- a/weed/worker/tasks/erasure_coding/ec_scheduler.go +++ /dev/null @@ -1,114 +0,0 @@ -package erasure_coding - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// Scheduler implements erasure coding task scheduling -type Scheduler struct { - maxConcurrent int - enabled bool -} - -// NewScheduler creates a new erasure coding scheduler -func NewScheduler() *Scheduler { - return &Scheduler{ - maxConcurrent: 1, // Conservative default - enabled: false, // Conservative default - } -} - -// GetTaskType returns the task type -func (s *Scheduler) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// CanScheduleNow determines if an erasure coding task can be scheduled now -func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { - if !s.enabled { - return false - } - - // Check if we have available workers - if len(availableWorkers) == 0 { - return false - } - - // Count running EC tasks - runningCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { - runningCount++ - } - } - - // Check concurrency limit - if runningCount >= s.maxConcurrent { - glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent) - return false - } - - // Check if any worker can handle EC tasks - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { - glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID) - return true - } - } - } - - return false -} - -// GetMaxConcurrent returns the maximum number of concurrent tasks -func (s *Scheduler) GetMaxConcurrent() int { - return s.maxConcurrent -} - -// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks -func (s *Scheduler) GetDefaultRepeatInterval() time.Duration { - return 24 * time.Hour // Don't repeat EC for 24 hours -} - -// GetPriority returns the priority for this task -func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority { - return types.TaskPriorityLow // EC is not urgent -} - -// WasTaskRecentlyCompleted checks if a similar task was recently completed -func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool { - // Don't repeat EC for 24 hours - interval := 24 * time.Hour - cutoff := now.Add(-interval) - - for _, completedTask := range completedTasks { - if completedTask.Type == types.TaskTypeErasureCoding && - completedTask.VolumeID == task.VolumeID && - completedTask.Server == task.Server && - completedTask.Status == types.TaskStatusCompleted && - completedTask.CompletedAt != nil && - completedTask.CompletedAt.After(cutoff) { - return true - } - } - return false -} - -// IsEnabled returns whether this task type is enabled -func (s *Scheduler) IsEnabled() bool { - return s.enabled -} - -// Configuration setters - -func (s *Scheduler) SetEnabled(enabled bool) { - s.enabled = enabled -} - -func (s *Scheduler) SetMaxConcurrent(max int) { - s.maxConcurrent = max -} diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go deleted file mode 100644 index e17cba89a..000000000 --- a/weed/worker/tasks/erasure_coding/ui.go +++ /dev/null @@ -1,309 +0,0 @@ -package erasure_coding - -import ( - "fmt" - "html/template" - "strconv" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// UIProvider provides the UI for erasure coding task configuration -type UIProvider struct { - detector *EcDetector - scheduler *Scheduler -} - -// NewUIProvider creates a new erasure coding UI provider -func NewUIProvider(detector *EcDetector, scheduler *Scheduler) *UIProvider { - return &UIProvider{ - detector: detector, - scheduler: scheduler, - } -} - -// GetTaskType returns the task type -func (ui *UIProvider) GetTaskType() types.TaskType { - return types.TaskTypeErasureCoding -} - -// GetDisplayName returns the human-readable name -func (ui *UIProvider) GetDisplayName() string { - return "Erasure Coding" -} - -// GetDescription returns a description of what this task does -func (ui *UIProvider) GetDescription() string { - return "Converts volumes to erasure coded format for improved data durability and fault tolerance" -} - -// GetIcon returns the icon CSS class for this task type -func (ui *UIProvider) GetIcon() string { - return "fas fa-shield-alt text-info" -} - -// ErasureCodingConfig represents the erasure coding configuration -type ErasureCodingConfig struct { - Enabled bool `json:"enabled"` - VolumeAgeHoursSeconds int `json:"volume_age_hours_seconds"` - FullnessRatio float64 `json:"fullness_ratio"` - ScanIntervalSeconds int `json:"scan_interval_seconds"` - MaxConcurrent int `json:"max_concurrent"` - ShardCount int `json:"shard_count"` - ParityCount int `json:"parity_count"` - CollectionFilter string `json:"collection_filter"` -} - -// Helper functions for duration conversion -func secondsToDuration(seconds int) time.Duration { - return time.Duration(seconds) * time.Second -} - -func durationToSeconds(d time.Duration) int { - return int(d.Seconds()) -} - -// formatDurationForUser formats seconds as a user-friendly duration string -func formatDurationForUser(seconds int) string { - d := secondsToDuration(seconds) - if d < time.Minute { - return fmt.Sprintf("%ds", seconds) - } - if d < time.Hour { - return fmt.Sprintf("%.0fm", d.Minutes()) - } - if d < 24*time.Hour { - return fmt.Sprintf("%.1fh", d.Hours()) - } - return fmt.Sprintf("%.1fd", d.Hours()/24) -} - -// RenderConfigForm renders the configuration form HTML -func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) { - config := ui.getCurrentECConfig() - - // Build form using the FormBuilder helper - form := types.NewFormBuilder() - - // Detection Settings - form.AddCheckboxField( - "enabled", - "Enable Erasure Coding Tasks", - "Whether erasure coding tasks should be automatically created", - config.Enabled, - ) - - form.AddNumberField( - "volume_age_hours_seconds", - "Volume Age Threshold", - "Only apply erasure coding to volumes older than this duration", - float64(config.VolumeAgeHoursSeconds), - true, - ) - - form.AddNumberField( - "scan_interval_seconds", - "Scan Interval", - "How often to scan for volumes needing erasure coding", - float64(config.ScanIntervalSeconds), - true, - ) - - // Scheduling Settings - form.AddNumberField( - "max_concurrent", - "Max Concurrent Tasks", - "Maximum number of erasure coding tasks that can run simultaneously", - float64(config.MaxConcurrent), - true, - ) - - // Erasure Coding Parameters - form.AddNumberField( - "shard_count", - "Data Shards", - "Number of data shards for erasure coding (recommended: 10)", - float64(config.ShardCount), - true, - ) - - form.AddNumberField( - "parity_count", - "Parity Shards", - "Number of parity shards for erasure coding (recommended: 4)", - float64(config.ParityCount), - true, - ) - - // Generate organized form sections using Bootstrap components - html := ` -<div class="row"> - <div class="col-12"> - <div class="card mb-4"> - <div class="card-header"> - <h5 class="mb-0"> - <i class="fas fa-shield-alt me-2"></i> - Erasure Coding Configuration - </h5> - </div> - <div class="card-body"> -` + string(form.Build()) + ` - </div> - </div> - </div> -</div> - -<div class="row"> - <div class="col-12"> - <div class="card mb-3"> - <div class="card-header"> - <h5 class="mb-0"> - <i class="fas fa-info-circle me-2"></i> - Performance Impact - </h5> - </div> - <div class="card-body"> - <div class="alert alert-info" role="alert"> - <h6 class="alert-heading">Important Notes:</h6> - <p class="mb-2"><strong>Performance:</strong> Erasure coding is CPU and I/O intensive. Consider running during off-peak hours.</p> - <p class="mb-0"><strong>Durability:</strong> With ` + fmt.Sprintf("%d+%d", config.ShardCount, config.ParityCount) + ` configuration, can tolerate up to ` + fmt.Sprintf("%d", config.ParityCount) + ` shard failures.</p> - </div> - </div> - </div> - </div> -</div>` - - return template.HTML(html), nil -} - -// ParseConfigForm parses form data into configuration -func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) { - config := &ErasureCodingConfig{} - - // Parse enabled - config.Enabled = len(formData["enabled"]) > 0 - - // Parse volume age hours - if values, ok := formData["volume_age_hours_seconds"]; ok && len(values) > 0 { - hours, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid volume age hours: %w", err) - } - config.VolumeAgeHoursSeconds = hours - } - - // Parse scan interval - if values, ok := formData["scan_interval_seconds"]; ok && len(values) > 0 { - interval, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid scan interval: %w", err) - } - config.ScanIntervalSeconds = interval - } - - // Parse max concurrent - if values, ok := formData["max_concurrent"]; ok && len(values) > 0 { - maxConcurrent, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid max concurrent: %w", err) - } - if maxConcurrent < 1 { - return nil, fmt.Errorf("max concurrent must be at least 1") - } - config.MaxConcurrent = maxConcurrent - } - - // Parse shard count - if values, ok := formData["shard_count"]; ok && len(values) > 0 { - shardCount, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid shard count: %w", err) - } - if shardCount < 1 { - return nil, fmt.Errorf("shard count must be at least 1") - } - config.ShardCount = shardCount - } - - // Parse parity count - if values, ok := formData["parity_count"]; ok && len(values) > 0 { - parityCount, err := strconv.Atoi(values[0]) - if err != nil { - return nil, fmt.Errorf("invalid parity count: %w", err) - } - if parityCount < 1 { - return nil, fmt.Errorf("parity count must be at least 1") - } - config.ParityCount = parityCount - } - - return config, nil -} - -// GetCurrentConfig returns the current configuration -func (ui *UIProvider) GetCurrentConfig() interface{} { - return ui.getCurrentECConfig() -} - -// ApplyConfig applies the new configuration -func (ui *UIProvider) ApplyConfig(config interface{}) error { - ecConfig, ok := config.(ErasureCodingConfig) - if !ok { - return fmt.Errorf("invalid config type, expected ErasureCodingConfig") - } - - // Apply to detector - if ui.detector != nil { - ui.detector.SetEnabled(ecConfig.Enabled) - ui.detector.SetVolumeAgeHours(ecConfig.VolumeAgeHoursSeconds) - ui.detector.SetScanInterval(secondsToDuration(ecConfig.ScanIntervalSeconds)) - } - - // Apply to scheduler - if ui.scheduler != nil { - ui.scheduler.SetEnabled(ecConfig.Enabled) - ui.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent) - } - - glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, age_threshold=%v, max_concurrent=%d, shards=%d+%d", - ecConfig.Enabled, ecConfig.VolumeAgeHoursSeconds, ecConfig.MaxConcurrent, ecConfig.ShardCount, ecConfig.ParityCount) - - return nil -} - -// getCurrentECConfig gets the current configuration from detector and scheduler -func (ui *UIProvider) getCurrentECConfig() ErasureCodingConfig { - config := ErasureCodingConfig{ - // Default values (fallback if detectors/schedulers are nil) - Enabled: true, - VolumeAgeHoursSeconds: 24 * 3600, // 24 hours in seconds - ScanIntervalSeconds: 2 * 3600, // 2 hours in seconds - MaxConcurrent: 1, - ShardCount: 10, - ParityCount: 4, - } - - // Get current values from detector - if ui.detector != nil { - config.Enabled = ui.detector.IsEnabled() - config.VolumeAgeHoursSeconds = ui.detector.GetVolumeAgeHours() - config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval()) - } - - // Get current values from scheduler - if ui.scheduler != nil { - config.MaxConcurrent = ui.scheduler.GetMaxConcurrent() - } - - return config -} - -// RegisterUI registers the erasure coding UI provider with the UI registry -func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) { - uiProvider := NewUIProvider(detector, scheduler) - uiRegistry.RegisterUI(uiProvider) - - glog.V(1).Infof("β
Registered erasure coding task UI provider") -} diff --git a/weed/worker/tasks/schema_provider.go b/weed/worker/tasks/schema_provider.go new file mode 100644 index 000000000..4d69556b1 --- /dev/null +++ b/weed/worker/tasks/schema_provider.go @@ -0,0 +1,51 @@ +package tasks + +import ( + "sync" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" +) + +// TaskConfigSchema defines the schema for task configuration +type TaskConfigSchema struct { + config.Schema // Embed common schema functionality + TaskName string `json:"task_name"` + DisplayName string `json:"display_name"` + Description string `json:"description"` + Icon string `json:"icon"` +} + +// TaskConfigSchemaProvider is an interface for providing task configuration schemas +type TaskConfigSchemaProvider interface { + GetConfigSchema() *TaskConfigSchema +} + +// schemaRegistry maintains a registry of schema providers by task type +type schemaRegistry struct { + providers map[string]TaskConfigSchemaProvider + mutex sync.RWMutex +} + +var globalSchemaRegistry = &schemaRegistry{ + providers: make(map[string]TaskConfigSchemaProvider), +} + +// RegisterTaskConfigSchema registers a schema provider for a task type +func RegisterTaskConfigSchema(taskType string, provider TaskConfigSchemaProvider) { + globalSchemaRegistry.mutex.Lock() + defer globalSchemaRegistry.mutex.Unlock() + globalSchemaRegistry.providers[taskType] = provider +} + +// GetTaskConfigSchema returns the schema for the specified task type +func GetTaskConfigSchema(taskType string) *TaskConfigSchema { + globalSchemaRegistry.mutex.RLock() + provider, exists := globalSchemaRegistry.providers[taskType] + globalSchemaRegistry.mutex.RUnlock() + + if !exists { + return nil + } + + return provider.GetConfigSchema() +} diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go index 482233f60..15369c137 100644 --- a/weed/worker/tasks/task.go +++ b/weed/worker/tasks/task.go @@ -2,29 +2,69 @@ package tasks import ( "context" + "fmt" "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) // BaseTask provides common functionality for all tasks type BaseTask struct { taskType types.TaskType + taskID string progress float64 cancelled bool mutex sync.RWMutex startTime time.Time estimatedDuration time.Duration + logger TaskLogger + loggerConfig TaskLoggerConfig + progressCallback func(float64) // Callback function for progress updates } // NewBaseTask creates a new base task func NewBaseTask(taskType types.TaskType) *BaseTask { return &BaseTask{ - taskType: taskType, - progress: 0.0, - cancelled: false, + taskType: taskType, + progress: 0.0, + cancelled: false, + loggerConfig: DefaultTaskLoggerConfig(), + } +} + +// NewBaseTaskWithLogger creates a new base task with custom logger configuration +func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask { + return &BaseTask{ + taskType: taskType, + progress: 0.0, + cancelled: false, + loggerConfig: loggerConfig, + } +} + +// InitializeLogger initializes the task logger with task details +func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error { + return t.InitializeTaskLogger(taskID, workerID, params) +} + +// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface) +func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.taskID = taskID + + logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig) + if err != nil { + return fmt.Errorf("failed to initialize task logger: %w", err) } + + t.logger = logger + t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType) + + return nil } // Type returns the task type @@ -39,24 +79,47 @@ func (t *BaseTask) GetProgress() float64 { return t.progress } -// SetProgress sets the current progress +// SetProgress sets the current progress and logs it func (t *BaseTask) SetProgress(progress float64) { t.mutex.Lock() - defer t.mutex.Unlock() if progress < 0 { progress = 0 } if progress > 100 { progress = 100 } + oldProgress := t.progress + callback := t.progressCallback t.progress = progress + t.mutex.Unlock() + + // Log progress change + if t.logger != nil && progress != oldProgress { + t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress)) + } + + // Call progress callback if set + if callback != nil && progress != oldProgress { + callback(progress) + } } // Cancel cancels the task func (t *BaseTask) Cancel() error { t.mutex.Lock() defer t.mutex.Unlock() + + if t.cancelled { + return nil + } + t.cancelled = true + + if t.logger != nil { + t.logger.LogStatus("cancelled", "Task cancelled by request") + t.logger.Warning("Task %s was cancelled", t.taskID) + } + return nil } @@ -72,6 +135,10 @@ func (t *BaseTask) SetStartTime(startTime time.Time) { t.mutex.Lock() defer t.mutex.Unlock() t.startTime = startTime + + if t.logger != nil { + t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339))) + } } // GetStartTime returns the task start time @@ -86,6 +153,13 @@ func (t *BaseTask) SetEstimatedDuration(duration time.Duration) { t.mutex.Lock() defer t.mutex.Unlock() t.estimatedDuration = duration + + if t.logger != nil { + t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{ + "estimated_duration": duration.String(), + "estimated_seconds": duration.Seconds(), + }) + } } // GetEstimatedDuration returns the estimated duration @@ -95,11 +169,115 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration { return t.estimatedDuration } -// ExecuteTask is a wrapper that handles common task execution logic +// SetProgressCallback sets the progress callback function +func (t *BaseTask) SetProgressCallback(callback func(float64)) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.progressCallback = callback +} + +// SetLoggerConfig sets the logger configuration for this task +func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.loggerConfig = config +} + +// GetLogger returns the task logger +func (t *BaseTask) GetLogger() TaskLogger { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.logger +} + +// GetTaskLogger returns the task logger (LoggerProvider interface) +func (t *BaseTask) GetTaskLogger() TaskLogger { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.logger +} + +// LogInfo logs an info message +func (t *BaseTask) LogInfo(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Info(message, args...) + } +} + +// LogWarning logs a warning message +func (t *BaseTask) LogWarning(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Warning(message, args...) + } +} + +// LogError logs an error message +func (t *BaseTask) LogError(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Error(message, args...) + } +} + +// LogDebug logs a debug message +func (t *BaseTask) LogDebug(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Debug(message, args...) + } +} + +// LogWithFields logs a message with structured fields +func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) { + if t.logger != nil { + t.logger.LogWithFields(level, message, fields) + } +} + +// FinishTask finalizes the task and closes the logger +func (t *BaseTask) FinishTask(success bool, errorMsg string) error { + if t.logger != nil { + if success { + t.logger.LogStatus("completed", "Task completed successfully") + t.logger.Info("Task %s finished successfully", t.taskID) + } else { + t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg)) + t.logger.Error("Task %s failed: %s", t.taskID, errorMsg) + } + + // Close logger + if err := t.logger.Close(); err != nil { + glog.Errorf("Failed to close task logger: %v", err) + } + } + + return nil +} + +// ExecuteTask is a wrapper that handles common task execution logic with logging func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error { + // Initialize logger if not already done + if t.logger == nil { + // Generate a temporary task ID if none provided + if t.taskID == "" { + t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano()) + } + + workerID := "unknown" + if err := t.InitializeLogger(t.taskID, workerID, params); err != nil { + glog.Warningf("Failed to initialize task logger: %v", err) + } + } + t.SetStartTime(time.Now()) t.SetProgress(0) + if t.logger != nil { + t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{ + "volume_id": params.VolumeID, + "server": params.Server, + "collection": params.Collection, + }) + } + // Create a context that can be cancelled ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -114,21 +292,29 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe // Check cancellation every second } } + t.LogWarning("Task cancellation detected, cancelling context") cancel() }() // Execute the actual task + t.LogInfo("Starting task executor") err := executor(ctx, params) if err != nil { + t.LogError("Task executor failed: %v", err) + t.FinishTask(false, err.Error()) return err } if t.IsCancelled() { + t.LogWarning("Task was cancelled during execution") + t.FinishTask(false, "cancelled") return context.Canceled } t.SetProgress(100) + t.LogInfo("Task executor completed successfully") + t.FinishTask(true, "") return nil } diff --git a/weed/worker/tasks/task_log_handler.go b/weed/worker/tasks/task_log_handler.go new file mode 100644 index 000000000..be5f00f12 --- /dev/null +++ b/weed/worker/tasks/task_log_handler.go @@ -0,0 +1,230 @@ +package tasks + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" +) + +// TaskLogHandler handles task log requests from admin server +type TaskLogHandler struct { + baseLogDir string +} + +// NewTaskLogHandler creates a new task log handler +func NewTaskLogHandler(baseLogDir string) *TaskLogHandler { + if baseLogDir == "" { + baseLogDir = "/tmp/seaweedfs/task_logs" + } + return &TaskLogHandler{ + baseLogDir: baseLogDir, + } +} + +// HandleLogRequest processes a task log request and returns the response +func (h *TaskLogHandler) HandleLogRequest(request *worker_pb.TaskLogRequest) *worker_pb.TaskLogResponse { + response := &worker_pb.TaskLogResponse{ + TaskId: request.TaskId, + WorkerId: request.WorkerId, + Success: false, + } + + // Find the task log directory + logDir, err := h.findTaskLogDirectory(request.TaskId) + if err != nil { + response.ErrorMessage = fmt.Sprintf("Task log directory not found: %v", err) + glog.Warningf("Task log request failed for %s: %v", request.TaskId, err) + return response + } + + // Read metadata if requested + if request.IncludeMetadata { + metadata, err := h.readTaskMetadata(logDir) + if err != nil { + response.ErrorMessage = fmt.Sprintf("Failed to read task metadata: %v", err) + glog.Warningf("Failed to read metadata for task %s: %v", request.TaskId, err) + return response + } + response.Metadata = metadata + } + + // Read log entries + logEntries, err := h.readTaskLogEntries(logDir, request) + if err != nil { + response.ErrorMessage = fmt.Sprintf("Failed to read task logs: %v", err) + glog.Warningf("Failed to read logs for task %s: %v", request.TaskId, err) + return response + } + + response.LogEntries = logEntries + response.Success = true + + glog.V(1).Infof("Successfully retrieved %d log entries for task %s", len(logEntries), request.TaskId) + return response +} + +// findTaskLogDirectory searches for the task log directory by task ID +func (h *TaskLogHandler) findTaskLogDirectory(taskID string) (string, error) { + entries, err := os.ReadDir(h.baseLogDir) + if err != nil { + return "", fmt.Errorf("failed to read base log directory: %w", err) + } + + // Look for directories that start with the task ID + for _, entry := range entries { + if entry.IsDir() && strings.HasPrefix(entry.Name(), taskID+"_") { + return filepath.Join(h.baseLogDir, entry.Name()), nil + } + } + + return "", fmt.Errorf("task log directory not found for task ID: %s", taskID) +} + +// readTaskMetadata reads task metadata from the log directory +func (h *TaskLogHandler) readTaskMetadata(logDir string) (*worker_pb.TaskLogMetadata, error) { + metadata, err := GetTaskLogMetadata(logDir) + if err != nil { + return nil, err + } + + // Convert to protobuf metadata + pbMetadata := &worker_pb.TaskLogMetadata{ + TaskId: metadata.TaskID, + TaskType: metadata.TaskType, + WorkerId: metadata.WorkerID, + StartTime: metadata.StartTime.Unix(), + Status: metadata.Status, + Progress: float32(metadata.Progress), + VolumeId: metadata.VolumeID, + Server: metadata.Server, + Collection: metadata.Collection, + LogFilePath: metadata.LogFilePath, + CreatedAt: metadata.CreatedAt.Unix(), + CustomData: make(map[string]string), + } + + // Set end time and duration if available + if metadata.EndTime != nil { + pbMetadata.EndTime = metadata.EndTime.Unix() + } + if metadata.Duration != nil { + pbMetadata.DurationMs = metadata.Duration.Milliseconds() + } + + // Convert custom data + for key, value := range metadata.CustomData { + if strValue, ok := value.(string); ok { + pbMetadata.CustomData[key] = strValue + } else { + pbMetadata.CustomData[key] = fmt.Sprintf("%v", value) + } + } + + return pbMetadata, nil +} + +// readTaskLogEntries reads and filters log entries based on the request +func (h *TaskLogHandler) readTaskLogEntries(logDir string, request *worker_pb.TaskLogRequest) ([]*worker_pb.TaskLogEntry, error) { + entries, err := ReadTaskLogs(logDir) + if err != nil { + return nil, err + } + + // Apply filters + var filteredEntries []TaskLogEntry + + for _, entry := range entries { + // Filter by log level + if request.LogLevel != "" && !strings.EqualFold(entry.Level, request.LogLevel) { + continue + } + + // Filter by time range + if request.StartTime > 0 && entry.Timestamp.Unix() < request.StartTime { + continue + } + if request.EndTime > 0 && entry.Timestamp.Unix() > request.EndTime { + continue + } + + filteredEntries = append(filteredEntries, entry) + } + + // Limit entries if requested + if request.MaxEntries > 0 && len(filteredEntries) > int(request.MaxEntries) { + // Take the most recent entries + start := len(filteredEntries) - int(request.MaxEntries) + filteredEntries = filteredEntries[start:] + } + + // Convert to protobuf entries + var pbEntries []*worker_pb.TaskLogEntry + for _, entry := range filteredEntries { + pbEntry := &worker_pb.TaskLogEntry{ + Timestamp: entry.Timestamp.Unix(), + Level: entry.Level, + Message: entry.Message, + Fields: make(map[string]string), + } + + // Set progress if available + if entry.Progress != nil { + pbEntry.Progress = float32(*entry.Progress) + } + + // Set status if available + if entry.Status != nil { + pbEntry.Status = *entry.Status + } + + // Convert fields + for key, value := range entry.Fields { + if strValue, ok := value.(string); ok { + pbEntry.Fields[key] = strValue + } else { + pbEntry.Fields[key] = fmt.Sprintf("%v", value) + } + } + + pbEntries = append(pbEntries, pbEntry) + } + + return pbEntries, nil +} + +// ListAvailableTaskLogs returns a list of available task log directories +func (h *TaskLogHandler) ListAvailableTaskLogs() ([]string, error) { + entries, err := os.ReadDir(h.baseLogDir) + if err != nil { + return nil, fmt.Errorf("failed to read base log directory: %w", err) + } + + var taskDirs []string + for _, entry := range entries { + if entry.IsDir() { + taskDirs = append(taskDirs, entry.Name()) + } + } + + return taskDirs, nil +} + +// CleanupOldLogs removes old task logs beyond the specified limit +func (h *TaskLogHandler) CleanupOldLogs(maxTasks int) error { + config := TaskLoggerConfig{ + BaseLogDir: h.baseLogDir, + MaxTasks: maxTasks, + } + + // Create a temporary logger to trigger cleanup + tempLogger := &FileTaskLogger{ + config: config, + } + + tempLogger.cleanupOldLogs() + return nil +} diff --git a/weed/worker/tasks/task_logger.go b/weed/worker/tasks/task_logger.go new file mode 100644 index 000000000..e9c06c35c --- /dev/null +++ b/weed/worker/tasks/task_logger.go @@ -0,0 +1,432 @@ +package tasks + +import ( + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TaskLogger provides file-based logging for individual tasks +type TaskLogger interface { + // Log methods + Info(message string, args ...interface{}) + Warning(message string, args ...interface{}) + Error(message string, args ...interface{}) + Debug(message string, args ...interface{}) + + // Progress and status logging + LogProgress(progress float64, message string) + LogStatus(status string, message string) + + // Structured logging + LogWithFields(level string, message string, fields map[string]interface{}) + + // Lifecycle + Close() error + GetLogDir() string +} + +// LoggerProvider interface for tasks that support logging +type LoggerProvider interface { + InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error + GetTaskLogger() TaskLogger +} + +// TaskLoggerConfig holds configuration for task logging +type TaskLoggerConfig struct { + BaseLogDir string + MaxTasks int // Maximum number of task logs to keep + MaxLogSizeMB int // Maximum log file size in MB + EnableConsole bool // Also log to console +} + +// FileTaskLogger implements TaskLogger using file-based logging +type FileTaskLogger struct { + taskID string + taskType types.TaskType + workerID string + logDir string + logFile *os.File + mutex sync.Mutex + config TaskLoggerConfig + metadata *TaskLogMetadata + closed bool +} + +// TaskLogMetadata contains metadata about the task execution +type TaskLogMetadata struct { + TaskID string `json:"task_id"` + TaskType string `json:"task_type"` + WorkerID string `json:"worker_id"` + StartTime time.Time `json:"start_time"` + EndTime *time.Time `json:"end_time,omitempty"` + Duration *time.Duration `json:"duration,omitempty"` + Status string `json:"status"` + Progress float64 `json:"progress"` + VolumeID uint32 `json:"volume_id,omitempty"` + Server string `json:"server,omitempty"` + Collection string `json:"collection,omitempty"` + CustomData map[string]interface{} `json:"custom_data,omitempty"` + LogFilePath string `json:"log_file_path"` + CreatedAt time.Time `json:"created_at"` +} + +// TaskLogEntry represents a single log entry +type TaskLogEntry struct { + Timestamp time.Time `json:"timestamp"` + Level string `json:"level"` + Message string `json:"message"` + Fields map[string]interface{} `json:"fields,omitempty"` + Progress *float64 `json:"progress,omitempty"` + Status *string `json:"status,omitempty"` +} + +// DefaultTaskLoggerConfig returns default configuration +func DefaultTaskLoggerConfig() TaskLoggerConfig { + return TaskLoggerConfig{ + BaseLogDir: "/data/task_logs", // Use persistent data directory + MaxTasks: 100, // Keep last 100 task logs + MaxLogSizeMB: 10, + EnableConsole: true, + } +} + +// NewTaskLogger creates a new file-based task logger +func NewTaskLogger(taskID string, taskType types.TaskType, workerID string, params types.TaskParams, config TaskLoggerConfig) (TaskLogger, error) { + // Create unique directory name with timestamp + timestamp := time.Now().Format("20060102_150405") + dirName := fmt.Sprintf("%s_%s_%s_%s", taskID, taskType, workerID, timestamp) + logDir := filepath.Join(config.BaseLogDir, dirName) + + // Create log directory + if err := os.MkdirAll(logDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create log directory %s: %w", logDir, err) + } + + // Create log file + logFilePath := filepath.Join(logDir, "task.log") + logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, fmt.Errorf("failed to create log file %s: %w", logFilePath, err) + } + + // Create metadata + metadata := &TaskLogMetadata{ + TaskID: taskID, + TaskType: string(taskType), + WorkerID: workerID, + StartTime: time.Now(), + Status: "started", + Progress: 0.0, + VolumeID: params.VolumeID, + Server: params.Server, + Collection: params.Collection, + CustomData: make(map[string]interface{}), + LogFilePath: logFilePath, + CreatedAt: time.Now(), + } + + logger := &FileTaskLogger{ + taskID: taskID, + taskType: taskType, + workerID: workerID, + logDir: logDir, + logFile: logFile, + config: config, + metadata: metadata, + closed: false, + } + + // Write initial log entry + logger.Info("Task logger initialized for %s (type: %s, worker: %s)", taskID, taskType, workerID) + logger.LogWithFields("INFO", "Task parameters", map[string]interface{}{ + "volume_id": params.VolumeID, + "server": params.Server, + "collection": params.Collection, + }) + + // Save initial metadata + if err := logger.saveMetadata(); err != nil { + glog.Warningf("Failed to save initial task metadata: %v", err) + } + + // Clean up old task logs + go logger.cleanupOldLogs() + + return logger, nil +} + +// Info logs an info message +func (l *FileTaskLogger) Info(message string, args ...interface{}) { + l.log("INFO", message, args...) +} + +// Warning logs a warning message +func (l *FileTaskLogger) Warning(message string, args ...interface{}) { + l.log("WARNING", message, args...) +} + +// Error logs an error message +func (l *FileTaskLogger) Error(message string, args ...interface{}) { + l.log("ERROR", message, args...) +} + +// Debug logs a debug message +func (l *FileTaskLogger) Debug(message string, args ...interface{}) { + l.log("DEBUG", message, args...) +} + +// LogProgress logs task progress +func (l *FileTaskLogger) LogProgress(progress float64, message string) { + l.mutex.Lock() + l.metadata.Progress = progress + l.mutex.Unlock() + + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: "INFO", + Message: message, + Progress: &progress, + } + + l.writeLogEntry(entry) + l.saveMetadata() // Update metadata with new progress +} + +// LogStatus logs task status change +func (l *FileTaskLogger) LogStatus(status string, message string) { + l.mutex.Lock() + l.metadata.Status = status + l.mutex.Unlock() + + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: "INFO", + Message: message, + Status: &status, + } + + l.writeLogEntry(entry) + l.saveMetadata() // Update metadata with new status +} + +// LogWithFields logs a message with structured fields +func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[string]interface{}) { + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: level, + Message: message, + Fields: fields, + } + + l.writeLogEntry(entry) +} + +// Close closes the logger and finalizes metadata +func (l *FileTaskLogger) Close() error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.closed { + return nil + } + + // Finalize metadata + endTime := time.Now() + duration := endTime.Sub(l.metadata.StartTime) + l.metadata.EndTime = &endTime + l.metadata.Duration = &duration + + if l.metadata.Status == "started" { + l.metadata.Status = "completed" + } + + // Save final metadata + l.saveMetadata() + + // Close log file + if l.logFile != nil { + if err := l.logFile.Close(); err != nil { + return fmt.Errorf("failed to close log file: %w", err) + } + } + + l.closed = true + l.Info("Task logger closed for %s", l.taskID) + + return nil +} + +// GetLogDir returns the log directory path +func (l *FileTaskLogger) GetLogDir() string { + return l.logDir +} + +// log is the internal logging method +func (l *FileTaskLogger) log(level string, message string, args ...interface{}) { + formattedMessage := fmt.Sprintf(message, args...) + + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: level, + Message: formattedMessage, + } + + l.writeLogEntry(entry) +} + +// writeLogEntry writes a log entry to the file +func (l *FileTaskLogger) writeLogEntry(entry TaskLogEntry) { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.closed || l.logFile == nil { + return + } + + // Format as JSON line + jsonData, err := json.Marshal(entry) + if err != nil { + glog.Errorf("Failed to marshal log entry: %v", err) + return + } + + // Write to file + if _, err := l.logFile.WriteString(string(jsonData) + "\n"); err != nil { + glog.Errorf("Failed to write log entry: %v", err) + return + } + + // Flush to disk + if err := l.logFile.Sync(); err != nil { + glog.Errorf("Failed to sync log file: %v", err) + } + + // Also log to console and stderr if enabled + if l.config.EnableConsole { + // Log to glog with proper call depth to show actual source location + // We need depth 3 to skip: writeLogEntry -> log -> Info/Warning/Error calls to reach the original caller + formattedMsg := fmt.Sprintf("[TASK-%s] %s: %s", l.taskID, entry.Level, entry.Message) + switch entry.Level { + case "ERROR": + glog.ErrorDepth(3, formattedMsg) + case "WARNING": + glog.WarningDepth(3, formattedMsg) + default: // INFO, DEBUG, etc. + glog.InfoDepth(3, formattedMsg) + } + // Also log to stderr for immediate visibility + fmt.Fprintf(os.Stderr, "[TASK-%s] %s: %s\n", l.taskID, entry.Level, entry.Message) + } +} + +// saveMetadata saves task metadata to file +func (l *FileTaskLogger) saveMetadata() error { + metadataPath := filepath.Join(l.logDir, "metadata.json") + + data, err := json.MarshalIndent(l.metadata, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + return os.WriteFile(metadataPath, data, 0644) +} + +// cleanupOldLogs removes old task log directories to maintain the limit +func (l *FileTaskLogger) cleanupOldLogs() { + baseDir := l.config.BaseLogDir + + entries, err := os.ReadDir(baseDir) + if err != nil { + glog.Warningf("Failed to read log directory %s: %v", baseDir, err) + return + } + + // Filter for directories only + var dirs []os.DirEntry + for _, entry := range entries { + if entry.IsDir() { + dirs = append(dirs, entry) + } + } + + // If we're under the limit, nothing to clean + if len(dirs) <= l.config.MaxTasks { + return + } + + // Sort by modification time (oldest first) + sort.Slice(dirs, func(i, j int) bool { + infoI, errI := dirs[i].Info() + infoJ, errJ := dirs[j].Info() + if errI != nil || errJ != nil { + return false + } + return infoI.ModTime().Before(infoJ.ModTime()) + }) + + // Remove oldest directories + numToRemove := len(dirs) - l.config.MaxTasks + for i := 0; i < numToRemove; i++ { + dirPath := filepath.Join(baseDir, dirs[i].Name()) + if err := os.RemoveAll(dirPath); err != nil { + glog.Warningf("Failed to remove old log directory %s: %v", dirPath, err) + } else { + glog.V(1).Infof("Cleaned up old task log directory: %s", dirPath) + } + } + + glog.V(1).Infof("Task log cleanup completed: removed %d old directories", numToRemove) +} + +// GetTaskLogMetadata reads metadata from a task log directory +func GetTaskLogMetadata(logDir string) (*TaskLogMetadata, error) { + metadataPath := filepath.Join(logDir, "metadata.json") + + data, err := os.ReadFile(metadataPath) + if err != nil { + return nil, fmt.Errorf("failed to read metadata file: %w", err) + } + + var metadata TaskLogMetadata + if err := json.Unmarshal(data, &metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + + return &metadata, nil +} + +// ReadTaskLogs reads all log entries from a task log file +func ReadTaskLogs(logDir string) ([]TaskLogEntry, error) { + logPath := filepath.Join(logDir, "task.log") + + file, err := os.Open(logPath) + if err != nil { + return nil, fmt.Errorf("failed to open log file: %w", err) + } + defer file.Close() + + var entries []TaskLogEntry + decoder := json.NewDecoder(file) + + for { + var entry TaskLogEntry + if err := decoder.Decode(&entry); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("failed to decode log entry: %w", err) + } + entries = append(entries, entry) + } + + return entries, nil +} diff --git a/weed/worker/tasks/ui_base.go b/weed/worker/tasks/ui_base.go new file mode 100644 index 000000000..ac22c20c4 --- /dev/null +++ b/weed/worker/tasks/ui_base.go @@ -0,0 +1,184 @@ +package tasks + +import ( + "reflect" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// BaseUIProvider provides common UIProvider functionality for all tasks +type BaseUIProvider struct { + taskType types.TaskType + displayName string + description string + icon string + schemaFunc func() *TaskConfigSchema + configFunc func() types.TaskConfig + applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error + applyTaskConfigFunc func(config types.TaskConfig) error +} + +// NewBaseUIProvider creates a new base UI provider +func NewBaseUIProvider( + taskType types.TaskType, + displayName string, + description string, + icon string, + schemaFunc func() *TaskConfigSchema, + configFunc func() types.TaskConfig, + applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error, + applyTaskConfigFunc func(config types.TaskConfig) error, +) *BaseUIProvider { + return &BaseUIProvider{ + taskType: taskType, + displayName: displayName, + description: description, + icon: icon, + schemaFunc: schemaFunc, + configFunc: configFunc, + applyTaskPolicyFunc: applyTaskPolicyFunc, + applyTaskConfigFunc: applyTaskConfigFunc, + } +} + +// GetTaskType returns the task type +func (ui *BaseUIProvider) GetTaskType() types.TaskType { + return ui.taskType +} + +// GetDisplayName returns the human-readable name +func (ui *BaseUIProvider) GetDisplayName() string { + return ui.displayName +} + +// GetDescription returns a description of what this task does +func (ui *BaseUIProvider) GetDescription() string { + return ui.description +} + +// GetIcon returns the icon CSS class for this task type +func (ui *BaseUIProvider) GetIcon() string { + return ui.icon +} + +// GetCurrentConfig returns the current configuration as TaskConfig +func (ui *BaseUIProvider) GetCurrentConfig() types.TaskConfig { + return ui.configFunc() +} + +// ApplyTaskPolicy applies protobuf TaskPolicy configuration +func (ui *BaseUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error { + return ui.applyTaskPolicyFunc(policy) +} + +// ApplyTaskConfig applies TaskConfig interface configuration +func (ui *BaseUIProvider) ApplyTaskConfig(config types.TaskConfig) error { + return ui.applyTaskConfigFunc(config) +} + +// CommonConfigGetter provides a common pattern for getting current configuration +type CommonConfigGetter[T any] struct { + defaultConfig T + detectorFunc func() T + schedulerFunc func() T +} + +// NewCommonConfigGetter creates a new common config getter +func NewCommonConfigGetter[T any]( + defaultConfig T, + detectorFunc func() T, + schedulerFunc func() T, +) *CommonConfigGetter[T] { + return &CommonConfigGetter[T]{ + defaultConfig: defaultConfig, + detectorFunc: detectorFunc, + schedulerFunc: schedulerFunc, + } +} + +// GetConfig returns the merged configuration +func (cg *CommonConfigGetter[T]) GetConfig() T { + config := cg.defaultConfig + + // Apply detector values if available + if cg.detectorFunc != nil { + detectorConfig := cg.detectorFunc() + mergeConfigs(&config, detectorConfig) + } + + // Apply scheduler values if available + if cg.schedulerFunc != nil { + schedulerConfig := cg.schedulerFunc() + mergeConfigs(&config, schedulerConfig) + } + + return config +} + +// mergeConfigs merges non-zero values from source into dest +func mergeConfigs[T any](dest *T, source T) { + destValue := reflect.ValueOf(dest).Elem() + sourceValue := reflect.ValueOf(source) + + if destValue.Kind() != reflect.Struct || sourceValue.Kind() != reflect.Struct { + return + } + + for i := 0; i < destValue.NumField(); i++ { + destField := destValue.Field(i) + sourceField := sourceValue.Field(i) + + if !destField.CanSet() { + continue + } + + // Only copy non-zero values + if !sourceField.IsZero() { + if destField.Type() == sourceField.Type() { + destField.Set(sourceField) + } + } + } +} + +// RegisterUIFunc provides a common registration function signature +type RegisterUIFunc[D, S any] func(uiRegistry *types.UIRegistry, detector D, scheduler S) + +// CommonRegisterUI provides a common registration implementation +func CommonRegisterUI[D, S any]( + taskType types.TaskType, + displayName string, + uiRegistry *types.UIRegistry, + detector D, + scheduler S, + schemaFunc func() *TaskConfigSchema, + configFunc func() types.TaskConfig, + applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error, + applyTaskConfigFunc func(config types.TaskConfig) error, +) { + // Get metadata from schema + schema := schemaFunc() + description := "Task configuration" + icon := "fas fa-cog" + + if schema != nil { + description = schema.Description + icon = schema.Icon + } + + uiProvider := NewBaseUIProvider( + taskType, + displayName, + description, + icon, + schemaFunc, + configFunc, + applyTaskPolicyFunc, + applyTaskConfigFunc, + ) + + uiRegistry.RegisterUI(uiProvider) + glog.V(1).Infof("β
Registered %s task UI provider", taskType) +} diff --git a/weed/worker/tasks/vacuum/config.go b/weed/worker/tasks/vacuum/config.go new file mode 100644 index 000000000..fe8c0e8c5 --- /dev/null +++ b/weed/worker/tasks/vacuum/config.go @@ -0,0 +1,190 @@ +package vacuum + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with vacuum-specific settings +type Config struct { + base.BaseConfig + GarbageThreshold float64 `json:"garbage_threshold"` + MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` + MinIntervalSeconds int `json:"min_interval_seconds"` +} + +// NewDefaultConfig creates a new default vacuum configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 2 * 60 * 60, // 2 hours + MaxConcurrent: 2, + }, + GarbageThreshold: 0.3, // 30% + MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours + MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days + } +} + +// ToTaskPolicy converts configuration to a TaskPolicy protobuf message +func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: float64(c.GarbageThreshold), + MinVolumeAgeHours: int32(c.MinVolumeAgeSeconds / 3600), // Convert seconds to hours + MinIntervalSeconds: int32(c.MinIntervalSeconds), + }, + }, + } +} + +// FromTaskPolicy loads configuration from a TaskPolicy protobuf message +func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + + // Set general TaskPolicy fields + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping + + // Set vacuum-specific fields from the task config + if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil { + c.GarbageThreshold = float64(vacuumConfig.GarbageThreshold) + c.MinVolumeAgeSeconds = int(vacuumConfig.MinVolumeAgeHours * 3600) // Convert hours to seconds + c.MinIntervalSeconds = int(vacuumConfig.MinIntervalSeconds) + } + + return nil +} + +// LoadConfigFromPersistence loads configuration from the persistence layer if available +func LoadConfigFromPersistence(configPersistence interface{}) *Config { + config := NewDefaultConfig() + + // Try to load from persistence if available + if persistence, ok := configPersistence.(interface { + LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error) + }); ok { + if policy, err := persistence.LoadVacuumTaskPolicy(); err == nil && policy != nil { + if err := config.FromTaskPolicy(policy); err == nil { + glog.V(1).Infof("Loaded vacuum configuration from persistence") + return config + } + } + } + + glog.V(1).Infof("Using default vacuum configuration") + return config +} + +// GetConfigSpec returns the configuration schema for vacuum tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable Vacuum Tasks", + Description: "Whether vacuum tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic vacuum task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 2 * 60 * 60, + MinValue: 10 * 60, + MaxValue: 24 * 60 * 60, + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for volumes needing vacuum", + HelpText: "The system will check for volumes that need vacuuming at this interval", + Placeholder: "2", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 2, + MinValue: 1, + MaxValue: 10, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of vacuum tasks that can run simultaneously", + HelpText: "Limits the number of vacuum operations running at the same time to control system load", + Placeholder: "2 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "garbage_threshold", + JSONName: "garbage_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.3, + MinValue: 0.0, + MaxValue: 1.0, + Required: true, + DisplayName: "Garbage Percentage Threshold", + Description: "Trigger vacuum when garbage ratio exceeds this percentage", + HelpText: "Volumes with more deleted content than this threshold will be vacuumed", + Placeholder: "0.30 (30%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_volume_age_seconds", + JSONName: "min_volume_age_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 24 * 60 * 60, + MinValue: 1 * 60 * 60, + MaxValue: 7 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Volume Age", + Description: "Only vacuum volumes older than this duration", + HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to", + Placeholder: "24", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "min_interval_seconds", + JSONName: "min_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 7 * 24 * 60 * 60, + MinValue: 1 * 24 * 60 * 60, + MaxValue: 30 * 24 * 60 * 60, + Required: true, + DisplayName: "Minimum Interval", + Description: "Minimum time between vacuum operations on the same volume", + HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time", + Placeholder: "7", + Unit: config.UnitDays, + InputType: "interval", + CSSClasses: "form-control", + }, + }, + } +} diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go new file mode 100644 index 000000000..7b5a1baf0 --- /dev/null +++ b/weed/worker/tasks/vacuum/detection.go @@ -0,0 +1,112 @@ +package vacuum + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection implements the detection logic for vacuum tasks +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + if !config.IsEnabled() { + return nil, nil + } + + vacuumConfig := config.(*Config) + var results []*types.TaskDetectionResult + minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second + + debugCount := 0 + skippedDueToGarbage := 0 + skippedDueToAge := 0 + + for _, metric := range metrics { + // Check if volume needs vacuum + if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { + priority := types.TaskPriorityNormal + if metric.GarbageRatio > 0.6 { + priority = types.TaskPriorityHigh + } + + result := &types.TaskDetectionResult{ + TaskType: types.TaskTypeVacuum, + VolumeID: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + Priority: priority, + Reason: "Volume has excessive garbage requiring vacuum", + ScheduleAt: time.Now(), + } + results = append(results, result) + } else { + // Debug why volume was not selected + if debugCount < 5 { // Limit debug output to first 5 volumes + if metric.GarbageRatio < vacuumConfig.GarbageThreshold { + skippedDueToGarbage++ + } + if metric.Age < minVolumeAge { + skippedDueToAge++ + } + } + debugCount++ + } + } + + // Log debug summary if no tasks were created + if len(results) == 0 && len(metrics) > 0 { + totalVolumes := len(metrics) + glog.Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage<threshold), %d (age<minimum)", + totalVolumes, vacuumConfig.GarbageThreshold*100, minVolumeAge, skippedDueToGarbage, skippedDueToAge) + + // Show details for first few volumes + for i, metric := range metrics { + if i >= 3 { // Limit to first 3 volumes + break + } + glog.Infof("VACUUM: Volume %d: garbage=%.2f%% (need β₯%.2f%%), age=%s (need β₯%s)", + metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100, + metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute)) + } + } + + return results, nil +} + +// Scheduling implements the scheduling logic for vacuum tasks +func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { + vacuumConfig := config.(*Config) + + // Count running vacuum tasks + runningVacuumCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskTypeVacuum { + runningVacuumCount++ + } + } + + // Check concurrency limit + if runningVacuumCount >= vacuumConfig.MaxConcurrent { + return false + } + + // Check for available workers with vacuum capability + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == types.TaskTypeVacuum { + return true + } + } + } + } + + return false +} + +// CreateTask creates a new vacuum task instance +func CreateTask(params types.TaskParams) (types.TaskInterface, error) { + // Create and return the vacuum task using existing Task type + return NewTask(params.Server, params.VolumeID), nil +} diff --git a/weed/worker/tasks/vacuum/ui.go b/weed/worker/tasks/vacuum/ui.go deleted file mode 100644 index 6f67a801a..000000000 --- a/weed/worker/tasks/vacuum/ui.go +++ /dev/null @@ -1,314 +0,0 @@ -package vacuum - -import ( - "fmt" - "html/template" - "strconv" - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// UIProvider provides the UI for vacuum task configuration -type UIProvider struct { - detector *VacuumDetector - scheduler *VacuumScheduler -} - -// NewUIProvider creates a new vacuum UI provider -func NewUIProvider(detector *VacuumDetector, scheduler *VacuumScheduler) *UIProvider { - return &UIProvider{ - detector: detector, - scheduler: scheduler, - } -} - -// GetTaskType returns the task type -func (ui *UIProvider) GetTaskType() types.TaskType { - return types.TaskTypeVacuum -} - -// GetDisplayName returns the human-readable name -func (ui *UIProvider) GetDisplayName() string { - return "Volume Vacuum" -} - -// GetDescription returns a description of what this task does -func (ui *UIProvider) GetDescription() string { - return "Reclaims disk space by removing deleted files from volumes" -} - -// GetIcon returns the icon CSS class for this task type -func (ui *UIProvider) GetIcon() string { - return "fas fa-broom text-primary" -} - -// VacuumConfig represents the vacuum configuration -type VacuumConfig struct { - Enabled bool `json:"enabled"` - GarbageThreshold float64 `json:"garbage_threshold"` - ScanIntervalSeconds int `json:"scan_interval_seconds"` - MaxConcurrent int `json:"max_concurrent"` - MinVolumeAgeSeconds int `json:"min_volume_age_seconds"` - MinIntervalSeconds int `json:"min_interval_seconds"` -} - -// Helper functions for duration conversion -func secondsToDuration(seconds int) time.Duration { - return time.Duration(seconds) * time.Second -} - -func durationToSeconds(d time.Duration) int { - return int(d.Seconds()) -} - -// formatDurationForUser formats seconds as a user-friendly duration string -func formatDurationForUser(seconds int) string { - d := secondsToDuration(seconds) - if d < time.Minute { - return fmt.Sprintf("%ds", seconds) - } - if d < time.Hour { - return fmt.Sprintf("%.0fm", d.Minutes()) - } - if d < 24*time.Hour { - return fmt.Sprintf("%.1fh", d.Hours()) - } - return fmt.Sprintf("%.1fd", d.Hours()/24) -} - -// RenderConfigForm renders the configuration form HTML -func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) { - config := ui.getCurrentVacuumConfig() - - // Build form using the FormBuilder helper - form := types.NewFormBuilder() - - // Detection Settings - form.AddCheckboxField( - "enabled", - "Enable Vacuum Tasks", - "Whether vacuum tasks should be automatically created", - config.Enabled, - ) - - form.AddNumberField( - "garbage_threshold", - "Garbage Threshold (%)", - "Trigger vacuum when garbage ratio exceeds this percentage (0.0-1.0)", - config.GarbageThreshold, - true, - ) - - form.AddDurationField( - "scan_interval", - "Scan Interval", - "How often to scan for volumes needing vacuum", - secondsToDuration(config.ScanIntervalSeconds), - true, - ) - - form.AddDurationField( - "min_volume_age", - "Minimum Volume Age", - "Only vacuum volumes older than this duration", - secondsToDuration(config.MinVolumeAgeSeconds), - true, - ) - - // Scheduling Settings - form.AddNumberField( - "max_concurrent", - "Max Concurrent Tasks", - "Maximum number of vacuum tasks that can run simultaneously", - float64(config.MaxConcurrent), - true, - ) - - form.AddDurationField( - "min_interval", - "Minimum Interval", - "Minimum time between vacuum operations on the same volume", - secondsToDuration(config.MinIntervalSeconds), - true, - ) - - // Generate organized form sections using Bootstrap components - html := ` -<div class="row"> - <div class="col-12"> - <div class="card mb-4"> - <div class="card-header"> - <h5 class="mb-0"> - <i class="fas fa-search me-2"></i> - Detection Settings - </h5> - </div> - <div class="card-body"> -` + string(form.Build()) + ` - </div> - </div> - </div> -</div> - -<script> -function resetForm() { - if (confirm('Reset all vacuum settings to defaults?')) { - // Reset to default values - document.querySelector('input[name="enabled"]').checked = true; - document.querySelector('input[name="garbage_threshold"]').value = '0.3'; - document.querySelector('input[name="scan_interval"]').value = '30m'; - document.querySelector('input[name="min_volume_age"]').value = '1h'; - document.querySelector('input[name="max_concurrent"]').value = '2'; - document.querySelector('input[name="min_interval"]').value = '6h'; - } -} -</script> -` - - return template.HTML(html), nil -} - -// ParseConfigForm parses form data into configuration -func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) { - config := &VacuumConfig{} - - // Parse enabled checkbox - config.Enabled = len(formData["enabled"]) > 0 && formData["enabled"][0] == "on" - - // Parse garbage threshold - if thresholdStr := formData["garbage_threshold"]; len(thresholdStr) > 0 { - if threshold, err := strconv.ParseFloat(thresholdStr[0], 64); err != nil { - return nil, fmt.Errorf("invalid garbage threshold: %w", err) - } else if threshold < 0 || threshold > 1 { - return nil, fmt.Errorf("garbage threshold must be between 0.0 and 1.0") - } else { - config.GarbageThreshold = threshold - } - } - - // Parse scan interval - if intervalStr := formData["scan_interval"]; len(intervalStr) > 0 { - if interval, err := time.ParseDuration(intervalStr[0]); err != nil { - return nil, fmt.Errorf("invalid scan interval: %w", err) - } else { - config.ScanIntervalSeconds = durationToSeconds(interval) - } - } - - // Parse min volume age - if ageStr := formData["min_volume_age"]; len(ageStr) > 0 { - if age, err := time.ParseDuration(ageStr[0]); err != nil { - return nil, fmt.Errorf("invalid min volume age: %w", err) - } else { - config.MinVolumeAgeSeconds = durationToSeconds(age) - } - } - - // Parse max concurrent - if concurrentStr := formData["max_concurrent"]; len(concurrentStr) > 0 { - if concurrent, err := strconv.Atoi(concurrentStr[0]); err != nil { - return nil, fmt.Errorf("invalid max concurrent: %w", err) - } else if concurrent < 1 { - return nil, fmt.Errorf("max concurrent must be at least 1") - } else { - config.MaxConcurrent = concurrent - } - } - - // Parse min interval - if intervalStr := formData["min_interval"]; len(intervalStr) > 0 { - if interval, err := time.ParseDuration(intervalStr[0]); err != nil { - return nil, fmt.Errorf("invalid min interval: %w", err) - } else { - config.MinIntervalSeconds = durationToSeconds(interval) - } - } - - return config, nil -} - -// GetCurrentConfig returns the current configuration -func (ui *UIProvider) GetCurrentConfig() interface{} { - return ui.getCurrentVacuumConfig() -} - -// ApplyConfig applies the new configuration -func (ui *UIProvider) ApplyConfig(config interface{}) error { - vacuumConfig, ok := config.(*VacuumConfig) - if !ok { - return fmt.Errorf("invalid config type, expected *VacuumConfig") - } - - // Apply to detector - if ui.detector != nil { - ui.detector.SetEnabled(vacuumConfig.Enabled) - ui.detector.SetGarbageThreshold(vacuumConfig.GarbageThreshold) - ui.detector.SetScanInterval(secondsToDuration(vacuumConfig.ScanIntervalSeconds)) - ui.detector.SetMinVolumeAge(secondsToDuration(vacuumConfig.MinVolumeAgeSeconds)) - } - - // Apply to scheduler - if ui.scheduler != nil { - ui.scheduler.SetEnabled(vacuumConfig.Enabled) - ui.scheduler.SetMaxConcurrent(vacuumConfig.MaxConcurrent) - ui.scheduler.SetMinInterval(secondsToDuration(vacuumConfig.MinIntervalSeconds)) - } - - glog.V(1).Infof("Applied vacuum configuration: enabled=%v, threshold=%.1f%%, scan_interval=%s, max_concurrent=%d", - vacuumConfig.Enabled, vacuumConfig.GarbageThreshold*100, formatDurationForUser(vacuumConfig.ScanIntervalSeconds), vacuumConfig.MaxConcurrent) - - return nil -} - -// getCurrentVacuumConfig gets the current configuration from detector and scheduler -func (ui *UIProvider) getCurrentVacuumConfig() *VacuumConfig { - config := &VacuumConfig{ - // Default values (fallback if detectors/schedulers are nil) - Enabled: true, - GarbageThreshold: 0.3, - ScanIntervalSeconds: 30 * 60, - MinVolumeAgeSeconds: 1 * 60 * 60, - MaxConcurrent: 2, - MinIntervalSeconds: 6 * 60 * 60, - } - - // Get current values from detector - if ui.detector != nil { - config.Enabled = ui.detector.IsEnabled() - config.GarbageThreshold = ui.detector.GetGarbageThreshold() - config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval()) - config.MinVolumeAgeSeconds = durationToSeconds(ui.detector.GetMinVolumeAge()) - } - - // Get current values from scheduler - if ui.scheduler != nil { - config.MaxConcurrent = ui.scheduler.GetMaxConcurrent() - config.MinIntervalSeconds = durationToSeconds(ui.scheduler.GetMinInterval()) - } - - return config -} - -// RegisterUI registers the vacuum UI provider with the UI registry -func RegisterUI(uiRegistry *types.UIRegistry, detector *VacuumDetector, scheduler *VacuumScheduler) { - uiProvider := NewUIProvider(detector, scheduler) - uiRegistry.RegisterUI(uiProvider) - - glog.V(1).Infof("β
Registered vacuum task UI provider") -} - -// Example: How to get the UI provider for external use -func GetUIProvider(uiRegistry *types.UIRegistry) *UIProvider { - provider := uiRegistry.GetProvider(types.TaskTypeVacuum) - if provider == nil { - return nil - } - - if vacuumProvider, ok := provider.(*UIProvider); ok { - return vacuumProvider - } - - return nil -} diff --git a/weed/worker/tasks/vacuum/vacuum.go b/weed/worker/tasks/vacuum/vacuum.go index dbfe35cf8..9cd254958 100644 --- a/weed/worker/tasks/vacuum/vacuum.go +++ b/weed/worker/tasks/vacuum/vacuum.go @@ -1,60 +1,184 @@ package vacuum import ( + "context" "fmt" + "io" "time" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // Task implements vacuum operation to reclaim disk space type Task struct { *tasks.BaseTask - server string - volumeID uint32 + server string + volumeID uint32 + garbageThreshold float64 } // NewTask creates a new vacuum task instance func NewTask(server string, volumeID uint32) *Task { task := &Task{ - BaseTask: tasks.NewBaseTask(types.TaskTypeVacuum), - server: server, - volumeID: volumeID, + BaseTask: tasks.NewBaseTask(types.TaskTypeVacuum), + server: server, + volumeID: volumeID, + garbageThreshold: 0.3, // Default 30% threshold } return task } -// Execute executes the vacuum task +// Execute performs the vacuum operation func (t *Task) Execute(params types.TaskParams) error { - glog.Infof("Starting vacuum task for volume %d on server %s", t.volumeID, t.server) - - // Simulate vacuum operation with progress updates - steps := []struct { - name string - duration time.Duration - progress float64 - }{ - {"Scanning volume", 1 * time.Second, 20}, - {"Identifying deleted files", 2 * time.Second, 50}, - {"Compacting data", 3 * time.Second, 80}, - {"Finalizing vacuum", 1 * time.Second, 100}, + // Use BaseTask.ExecuteTask to handle logging initialization + return t.ExecuteTask(context.Background(), params, t.executeImpl) +} + +// executeImpl is the actual vacuum implementation +func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error { + t.LogInfo("Starting vacuum for volume %d on server %s", t.volumeID, t.server) + + // Parse garbage threshold from typed parameters + if params.TypedParams != nil { + if vacuumParams := params.TypedParams.GetVacuumParams(); vacuumParams != nil { + t.garbageThreshold = vacuumParams.GarbageThreshold + t.LogWithFields("INFO", "Using garbage threshold from parameters", map[string]interface{}{ + "threshold": t.garbageThreshold, + }) + } + } + + // Convert server address to gRPC address and use proper dial option + grpcAddress := pb.ServerToGrpcAddress(t.server) + var dialOpt grpc.DialOption = grpc.WithTransportCredentials(insecure.NewCredentials()) + if params.GrpcDialOption != nil { + dialOpt = params.GrpcDialOption + } + + conn, err := grpc.NewClient(grpcAddress, dialOpt) + if err != nil { + t.LogError("Failed to connect to volume server %s: %v", t.server, err) + return fmt.Errorf("failed to connect to volume server %s: %v", t.server, err) + } + defer conn.Close() + + client := volume_server_pb.NewVolumeServerClient(conn) + + // Step 1: Check vacuum eligibility + t.SetProgress(10.0) + t.LogDebug("Checking vacuum eligibility for volume %d", t.volumeID) + + checkResp, err := client.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{ + VolumeId: t.volumeID, + }) + if err != nil { + t.LogError("Vacuum check failed for volume %d: %v", t.volumeID, err) + return fmt.Errorf("vacuum check failed for volume %d: %v", t.volumeID, err) + } + + // Check if garbage ratio meets threshold + if checkResp.GarbageRatio < t.garbageThreshold { + t.LogWarning("Volume %d garbage ratio %.2f%% is below threshold %.2f%%, skipping vacuum", + t.volumeID, checkResp.GarbageRatio*100, t.garbageThreshold*100) + return fmt.Errorf("volume %d garbage ratio %.2f%% is below threshold %.2f%%, skipping vacuum", + t.volumeID, checkResp.GarbageRatio*100, t.garbageThreshold*100) + } + + t.LogWithFields("INFO", "Volume eligible for vacuum", map[string]interface{}{ + "volume_id": t.volumeID, + "garbage_ratio": checkResp.GarbageRatio, + "threshold": t.garbageThreshold, + "garbage_percent": checkResp.GarbageRatio * 100, + }) + + // Step 2: Compact volume + t.SetProgress(30.0) + t.LogInfo("Starting compact for volume %d", t.volumeID) + + compactStream, err := client.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{ + VolumeId: t.volumeID, + }) + if err != nil { + t.LogError("Vacuum compact failed for volume %d: %v", t.volumeID, err) + return fmt.Errorf("vacuum compact failed for volume %d: %v", t.volumeID, err) } - for _, step := range steps { - if t.IsCancelled() { - return fmt.Errorf("vacuum task cancelled") + // Process compact stream and track progress + var processedBytes int64 + var totalBytes int64 + + for { + resp, err := compactStream.Recv() + if err != nil { + if err == io.EOF { + break + } + t.LogError("Vacuum compact stream error for volume %d: %v", t.volumeID, err) + return fmt.Errorf("vacuum compact stream error for volume %d: %v", t.volumeID, err) } - glog.V(1).Infof("Vacuum task step: %s", step.name) - t.SetProgress(step.progress) + processedBytes = resp.ProcessedBytes + if resp.LoadAvg_1M > 0 { + totalBytes = int64(resp.LoadAvg_1M) // This is a rough approximation + } + + // Update progress based on processed bytes (30% to 70% of total progress) + if totalBytes > 0 { + compactProgress := float64(processedBytes) / float64(totalBytes) + if compactProgress > 1.0 { + compactProgress = 1.0 + } + progress := 30.0 + (compactProgress * 40.0) // 30% to 70% + t.SetProgress(progress) + } - // Simulate work - time.Sleep(step.duration) + t.LogWithFields("DEBUG", "Volume compact progress", map[string]interface{}{ + "volume_id": t.volumeID, + "processed_bytes": processedBytes, + "total_bytes": totalBytes, + "compact_progress": fmt.Sprintf("%.1f%%", (float64(processedBytes)/float64(totalBytes))*100), + }) } - glog.Infof("Vacuum task completed for volume %d on server %s", t.volumeID, t.server) + // Step 3: Commit vacuum changes + t.SetProgress(80.0) + t.LogInfo("Committing vacuum for volume %d", t.volumeID) + + commitResp, err := client.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{ + VolumeId: t.volumeID, + }) + if err != nil { + t.LogError("Vacuum commit failed for volume %d: %v", t.volumeID, err) + return fmt.Errorf("vacuum commit failed for volume %d: %v", t.volumeID, err) + } + + // Step 4: Cleanup temporary files + t.SetProgress(90.0) + t.LogInfo("Cleaning up vacuum files for volume %d", t.volumeID) + + _, err = client.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: t.volumeID, + }) + if err != nil { + // Log warning but don't fail the task + t.LogWarning("Vacuum cleanup warning for volume %d: %v", t.volumeID, err) + } + + t.SetProgress(100.0) + + newVolumeSize := commitResp.VolumeSize + t.LogWithFields("INFO", "Successfully completed vacuum", map[string]interface{}{ + "volume_id": t.volumeID, + "server": t.server, + "new_volume_size": newVolumeSize, + "garbage_reclaimed": true, + }) + return nil } @@ -71,9 +195,20 @@ func (t *Task) Validate(params types.TaskParams) error { // EstimateTime estimates the time needed for the task func (t *Task) EstimateTime(params types.TaskParams) time.Duration { - // Base time for vacuum operation - baseTime := 25 * time.Second + // Base time for vacuum operations - varies by volume size and garbage ratio + // Typically vacuum is faster than EC encoding + baseTime := 5 * time.Minute - // Could adjust based on volume size or usage patterns + // Use default estimation since volume size is not available in typed params return baseTime } + +// GetProgress returns the current progress +func (t *Task) GetProgress() float64 { + return t.BaseTask.GetProgress() +} + +// Cancel cancels the task +func (t *Task) Cancel() error { + return t.BaseTask.Cancel() +} diff --git a/weed/worker/tasks/vacuum/vacuum_detector.go b/weed/worker/tasks/vacuum/vacuum_detector.go deleted file mode 100644 index 6d7230c6c..000000000 --- a/weed/worker/tasks/vacuum/vacuum_detector.go +++ /dev/null @@ -1,132 +0,0 @@ -package vacuum - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// VacuumDetector implements vacuum task detection using code instead of schemas -type VacuumDetector struct { - enabled bool - garbageThreshold float64 - minVolumeAge time.Duration - scanInterval time.Duration -} - -// Compile-time interface assertions -var ( - _ types.TaskDetector = (*VacuumDetector)(nil) - _ types.PolicyConfigurableDetector = (*VacuumDetector)(nil) -) - -// NewVacuumDetector creates a new simple vacuum detector -func NewVacuumDetector() *VacuumDetector { - return &VacuumDetector{ - enabled: true, - garbageThreshold: 0.3, - minVolumeAge: 24 * time.Hour, - scanInterval: 30 * time.Minute, - } -} - -// GetTaskType returns the task type -func (d *VacuumDetector) GetTaskType() types.TaskType { - return types.TaskTypeVacuum -} - -// ScanForTasks scans for volumes that need vacuum operations -func (d *VacuumDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { - if !d.enabled { - return nil, nil - } - - var results []*types.TaskDetectionResult - - for _, metric := range volumeMetrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= d.garbageThreshold && metric.Age >= d.minVolumeAge { - // Higher priority for volumes with more garbage - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } - - result := &types.TaskDetectionResult{ - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - Parameters: map[string]interface{}{ - "garbage_ratio": metric.GarbageRatio, - "volume_age": metric.Age.String(), - }, - ScheduleAt: time.Now(), - } - results = append(results, result) - } - } - - glog.V(2).Infof("Vacuum detector found %d volumes needing vacuum", len(results)) - return results, nil -} - -// ScanInterval returns how often this detector should scan -func (d *VacuumDetector) ScanInterval() time.Duration { - return d.scanInterval -} - -// IsEnabled returns whether this detector is enabled -func (d *VacuumDetector) IsEnabled() bool { - return d.enabled -} - -// Configuration setters - -func (d *VacuumDetector) SetEnabled(enabled bool) { - d.enabled = enabled -} - -func (d *VacuumDetector) SetGarbageThreshold(threshold float64) { - d.garbageThreshold = threshold -} - -func (d *VacuumDetector) SetScanInterval(interval time.Duration) { - d.scanInterval = interval -} - -func (d *VacuumDetector) SetMinVolumeAge(age time.Duration) { - d.minVolumeAge = age -} - -// GetGarbageThreshold returns the current garbage threshold -func (d *VacuumDetector) GetGarbageThreshold() float64 { - return d.garbageThreshold -} - -// GetMinVolumeAge returns the minimum volume age -func (d *VacuumDetector) GetMinVolumeAge() time.Duration { - return d.minVolumeAge -} - -// GetScanInterval returns the scan interval -func (d *VacuumDetector) GetScanInterval() time.Duration { - return d.scanInterval -} - -// ConfigureFromPolicy configures the detector based on the maintenance policy -func (d *VacuumDetector) ConfigureFromPolicy(policy interface{}) { - // Type assert to the maintenance policy type we expect - if maintenancePolicy, ok := policy.(interface { - GetVacuumEnabled() bool - GetVacuumGarbageRatio() float64 - }); ok { - d.SetEnabled(maintenancePolicy.GetVacuumEnabled()) - d.SetGarbageThreshold(maintenancePolicy.GetVacuumGarbageRatio()) - } else { - glog.V(1).Infof("Could not configure vacuum detector from policy: unsupported policy type") - } -} diff --git a/weed/worker/tasks/vacuum/vacuum_register.go b/weed/worker/tasks/vacuum/vacuum_register.go index 7d930a88e..d660c9d42 100644 --- a/weed/worker/tasks/vacuum/vacuum_register.go +++ b/weed/worker/tasks/vacuum/vacuum_register.go @@ -2,80 +2,71 @@ package vacuum import ( "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Factory creates vacuum task instances -type Factory struct { - *tasks.BaseTaskFactory -} +// Global variable to hold the task definition for configuration updates +var globalTaskDef *base.TaskDefinition -// NewFactory creates a new vacuum task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeVacuum, - []string{"vacuum", "storage"}, - "Vacuum operation to reclaim disk space by removing deleted files", - ), - } -} - -// Create creates a new vacuum task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID) - task.SetEstimatedDuration(task.EstimateTime(params)) +// Auto-register this task when the package is imported +func init() { + RegisterVacuumTask() - return task, nil + // Register config updater + tasks.AutoRegisterConfigUpdater(types.TaskTypeVacuum, UpdateConfigFromPersistence) } -// Shared detector and scheduler instances -var ( - sharedDetector *VacuumDetector - sharedScheduler *VacuumScheduler -) +// RegisterVacuumTask registers the vacuum task with the new architecture +func RegisterVacuumTask() { + // Create configuration instance + config := NewDefaultConfig() -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*VacuumDetector, *VacuumScheduler) { - if sharedDetector == nil { - sharedDetector = NewVacuumDetector() - } - if sharedScheduler == nil { - sharedScheduler = NewVacuumScheduler() + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeVacuum, + Name: "vacuum", + DisplayName: "Volume Vacuum", + Description: "Reclaims disk space by removing deleted files from volumes", + Icon: "fas fa-broom text-primary", + Capabilities: []string{"vacuum", "storage"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: CreateTask, + DetectionFunc: Detection, + ScanInterval: 2 * time.Hour, + SchedulingFunc: Scheduling, + MaxConcurrent: 2, + RepeatInterval: 7 * 24 * time.Hour, } - return sharedDetector, sharedScheduler -} -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*VacuumDetector, *VacuumScheduler) { - return getSharedInstances() + // Store task definition globally for configuration updates + globalTaskDef = taskDef + + // Register everything with a single function call! + base.RegisterTask(taskDef) } -// Auto-register this task when the package is imported -func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeVacuum, factory) +// UpdateConfigFromPersistence updates the vacuum configuration from persistence +func UpdateConfigFromPersistence(configPersistence interface{}) error { + if globalTaskDef == nil { + return fmt.Errorf("vacuum task not registered") + } - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() + // Load configuration from persistence + newConfig := LoadConfigFromPersistence(configPersistence) + if newConfig == nil { + return fmt.Errorf("failed to load configuration from persistence") + } - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) + // Update the task definition's config + globalTaskDef.Config = newConfig - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + glog.V(1).Infof("Updated vacuum task configuration from persistence") + return nil } diff --git a/weed/worker/tasks/vacuum/vacuum_scheduler.go b/weed/worker/tasks/vacuum/vacuum_scheduler.go deleted file mode 100644 index 2b67a9f40..000000000 --- a/weed/worker/tasks/vacuum/vacuum_scheduler.go +++ /dev/null @@ -1,111 +0,0 @@ -package vacuum - -import ( - "time" - - "github.com/seaweedfs/seaweedfs/weed/worker/types" -) - -// VacuumScheduler implements vacuum task scheduling using code instead of schemas -type VacuumScheduler struct { - enabled bool - maxConcurrent int - minInterval time.Duration -} - -// Compile-time interface assertions -var ( - _ types.TaskScheduler = (*VacuumScheduler)(nil) -) - -// NewVacuumScheduler creates a new simple vacuum scheduler -func NewVacuumScheduler() *VacuumScheduler { - return &VacuumScheduler{ - enabled: true, - maxConcurrent: 2, - minInterval: 6 * time.Hour, - } -} - -// GetTaskType returns the task type -func (s *VacuumScheduler) GetTaskType() types.TaskType { - return types.TaskTypeVacuum -} - -// CanScheduleNow determines if a vacuum task can be scheduled right now -func (s *VacuumScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { - // Check if scheduler is enabled - if !s.enabled { - return false - } - - // Check concurrent limit - runningVacuumCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeVacuum { - runningVacuumCount++ - } - } - - if runningVacuumCount >= s.maxConcurrent { - return false - } - - // Check if there's an available worker with vacuum capability - for _, worker := range availableWorkers { - if worker.CurrentLoad < worker.MaxConcurrent { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeVacuum { - return true - } - } - } - } - - return false -} - -// GetPriority returns the priority for this task -func (s *VacuumScheduler) GetPriority(task *types.Task) types.TaskPriority { - // Could adjust priority based on task parameters - if params, ok := task.Parameters["garbage_ratio"].(float64); ok { - if params > 0.8 { - return types.TaskPriorityHigh - } - } - return task.Priority -} - -// GetMaxConcurrent returns max concurrent tasks of this type -func (s *VacuumScheduler) GetMaxConcurrent() int { - return s.maxConcurrent -} - -// GetDefaultRepeatInterval returns the default interval to wait before repeating vacuum tasks -func (s *VacuumScheduler) GetDefaultRepeatInterval() time.Duration { - return s.minInterval -} - -// IsEnabled returns whether this scheduler is enabled -func (s *VacuumScheduler) IsEnabled() bool { - return s.enabled -} - -// Configuration setters - -func (s *VacuumScheduler) SetEnabled(enabled bool) { - s.enabled = enabled -} - -func (s *VacuumScheduler) SetMaxConcurrent(max int) { - s.maxConcurrent = max -} - -func (s *VacuumScheduler) SetMinInterval(interval time.Duration) { - s.minInterval = interval -} - -// GetMinInterval returns the minimum interval -func (s *VacuumScheduler) GetMinInterval() time.Duration { - return s.minInterval -} |
