diff options
Diffstat (limited to 'weed/worker/tasks/task.go')
| -rw-r--r-- | weed/worker/tasks/task.go | 198 |
1 files changed, 192 insertions, 6 deletions
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go index 482233f60..15369c137 100644 --- a/weed/worker/tasks/task.go +++ b/weed/worker/tasks/task.go @@ -2,29 +2,69 @@ package tasks import ( "context" + "fmt" "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) // BaseTask provides common functionality for all tasks type BaseTask struct { taskType types.TaskType + taskID string progress float64 cancelled bool mutex sync.RWMutex startTime time.Time estimatedDuration time.Duration + logger TaskLogger + loggerConfig TaskLoggerConfig + progressCallback func(float64) // Callback function for progress updates } // NewBaseTask creates a new base task func NewBaseTask(taskType types.TaskType) *BaseTask { return &BaseTask{ - taskType: taskType, - progress: 0.0, - cancelled: false, + taskType: taskType, + progress: 0.0, + cancelled: false, + loggerConfig: DefaultTaskLoggerConfig(), + } +} + +// NewBaseTaskWithLogger creates a new base task with custom logger configuration +func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask { + return &BaseTask{ + taskType: taskType, + progress: 0.0, + cancelled: false, + loggerConfig: loggerConfig, + } +} + +// InitializeLogger initializes the task logger with task details +func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error { + return t.InitializeTaskLogger(taskID, workerID, params) +} + +// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface) +func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.taskID = taskID + + logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig) + if err != nil { + return fmt.Errorf("failed to initialize task logger: %w", err) } + + t.logger = logger + t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType) + + return nil } // Type returns the task type @@ -39,24 +79,47 @@ func (t *BaseTask) GetProgress() float64 { return t.progress } -// SetProgress sets the current progress +// SetProgress sets the current progress and logs it func (t *BaseTask) SetProgress(progress float64) { t.mutex.Lock() - defer t.mutex.Unlock() if progress < 0 { progress = 0 } if progress > 100 { progress = 100 } + oldProgress := t.progress + callback := t.progressCallback t.progress = progress + t.mutex.Unlock() + + // Log progress change + if t.logger != nil && progress != oldProgress { + t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress)) + } + + // Call progress callback if set + if callback != nil && progress != oldProgress { + callback(progress) + } } // Cancel cancels the task func (t *BaseTask) Cancel() error { t.mutex.Lock() defer t.mutex.Unlock() + + if t.cancelled { + return nil + } + t.cancelled = true + + if t.logger != nil { + t.logger.LogStatus("cancelled", "Task cancelled by request") + t.logger.Warning("Task %s was cancelled", t.taskID) + } + return nil } @@ -72,6 +135,10 @@ func (t *BaseTask) SetStartTime(startTime time.Time) { t.mutex.Lock() defer t.mutex.Unlock() t.startTime = startTime + + if t.logger != nil { + t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339))) + } } // GetStartTime returns the task start time @@ -86,6 +153,13 @@ func (t *BaseTask) SetEstimatedDuration(duration time.Duration) { t.mutex.Lock() defer t.mutex.Unlock() t.estimatedDuration = duration + + if t.logger != nil { + t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{ + "estimated_duration": duration.String(), + "estimated_seconds": duration.Seconds(), + }) + } } // GetEstimatedDuration returns the estimated duration @@ -95,11 +169,115 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration { return t.estimatedDuration } -// ExecuteTask is a wrapper that handles common task execution logic +// SetProgressCallback sets the progress callback function +func (t *BaseTask) SetProgressCallback(callback func(float64)) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.progressCallback = callback +} + +// SetLoggerConfig sets the logger configuration for this task +func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.loggerConfig = config +} + +// GetLogger returns the task logger +func (t *BaseTask) GetLogger() TaskLogger { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.logger +} + +// GetTaskLogger returns the task logger (LoggerProvider interface) +func (t *BaseTask) GetTaskLogger() TaskLogger { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.logger +} + +// LogInfo logs an info message +func (t *BaseTask) LogInfo(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Info(message, args...) + } +} + +// LogWarning logs a warning message +func (t *BaseTask) LogWarning(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Warning(message, args...) + } +} + +// LogError logs an error message +func (t *BaseTask) LogError(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Error(message, args...) + } +} + +// LogDebug logs a debug message +func (t *BaseTask) LogDebug(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Debug(message, args...) + } +} + +// LogWithFields logs a message with structured fields +func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) { + if t.logger != nil { + t.logger.LogWithFields(level, message, fields) + } +} + +// FinishTask finalizes the task and closes the logger +func (t *BaseTask) FinishTask(success bool, errorMsg string) error { + if t.logger != nil { + if success { + t.logger.LogStatus("completed", "Task completed successfully") + t.logger.Info("Task %s finished successfully", t.taskID) + } else { + t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg)) + t.logger.Error("Task %s failed: %s", t.taskID, errorMsg) + } + + // Close logger + if err := t.logger.Close(); err != nil { + glog.Errorf("Failed to close task logger: %v", err) + } + } + + return nil +} + +// ExecuteTask is a wrapper that handles common task execution logic with logging func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error { + // Initialize logger if not already done + if t.logger == nil { + // Generate a temporary task ID if none provided + if t.taskID == "" { + t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano()) + } + + workerID := "unknown" + if err := t.InitializeLogger(t.taskID, workerID, params); err != nil { + glog.Warningf("Failed to initialize task logger: %v", err) + } + } + t.SetStartTime(time.Now()) t.SetProgress(0) + if t.logger != nil { + t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{ + "volume_id": params.VolumeID, + "server": params.Server, + "collection": params.Collection, + }) + } + // Create a context that can be cancelled ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -114,21 +292,29 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe // Check cancellation every second } } + t.LogWarning("Task cancellation detected, cancelling context") cancel() }() // Execute the actual task + t.LogInfo("Starting task executor") err := executor(ctx, params) if err != nil { + t.LogError("Task executor failed: %v", err) + t.FinishTask(false, err.Error()) return err } if t.IsCancelled() { + t.LogWarning("Task was cancelled during execution") + t.FinishTask(false, "cancelled") return context.Canceled } t.SetProgress(100) + t.LogInfo("Task executor completed successfully") + t.FinishTask(true, "") return nil } |
