aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/task.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/task.go')
-rw-r--r--weed/worker/tasks/task.go198
1 files changed, 192 insertions, 6 deletions
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go
index 482233f60..15369c137 100644
--- a/weed/worker/tasks/task.go
+++ b/weed/worker/tasks/task.go
@@ -2,29 +2,69 @@ package tasks
import (
"context"
+ "fmt"
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BaseTask provides common functionality for all tasks
type BaseTask struct {
taskType types.TaskType
+ taskID string
progress float64
cancelled bool
mutex sync.RWMutex
startTime time.Time
estimatedDuration time.Duration
+ logger TaskLogger
+ loggerConfig TaskLoggerConfig
+ progressCallback func(float64) // Callback function for progress updates
}
// NewBaseTask creates a new base task
func NewBaseTask(taskType types.TaskType) *BaseTask {
return &BaseTask{
- taskType: taskType,
- progress: 0.0,
- cancelled: false,
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: DefaultTaskLoggerConfig(),
+ }
+}
+
+// NewBaseTaskWithLogger creates a new base task with custom logger configuration
+func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask {
+ return &BaseTask{
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: loggerConfig,
+ }
+}
+
+// InitializeLogger initializes the task logger with task details
+func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error {
+ return t.InitializeTaskLogger(taskID, workerID, params)
+}
+
+// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
+func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ t.taskID = taskID
+
+ logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig)
+ if err != nil {
+ return fmt.Errorf("failed to initialize task logger: %w", err)
}
+
+ t.logger = logger
+ t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType)
+
+ return nil
}
// Type returns the task type
@@ -39,24 +79,47 @@ func (t *BaseTask) GetProgress() float64 {
return t.progress
}
-// SetProgress sets the current progress
+// SetProgress sets the current progress and logs it
func (t *BaseTask) SetProgress(progress float64) {
t.mutex.Lock()
- defer t.mutex.Unlock()
if progress < 0 {
progress = 0
}
if progress > 100 {
progress = 100
}
+ oldProgress := t.progress
+ callback := t.progressCallback
t.progress = progress
+ t.mutex.Unlock()
+
+ // Log progress change
+ if t.logger != nil && progress != oldProgress {
+ t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress))
+ }
+
+ // Call progress callback if set
+ if callback != nil && progress != oldProgress {
+ callback(progress)
+ }
}
// Cancel cancels the task
func (t *BaseTask) Cancel() error {
t.mutex.Lock()
defer t.mutex.Unlock()
+
+ if t.cancelled {
+ return nil
+ }
+
t.cancelled = true
+
+ if t.logger != nil {
+ t.logger.LogStatus("cancelled", "Task cancelled by request")
+ t.logger.Warning("Task %s was cancelled", t.taskID)
+ }
+
return nil
}
@@ -72,6 +135,10 @@ func (t *BaseTask) SetStartTime(startTime time.Time) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.startTime = startTime
+
+ if t.logger != nil {
+ t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339)))
+ }
}
// GetStartTime returns the task start time
@@ -86,6 +153,13 @@ func (t *BaseTask) SetEstimatedDuration(duration time.Duration) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.estimatedDuration = duration
+
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{
+ "estimated_duration": duration.String(),
+ "estimated_seconds": duration.Seconds(),
+ })
+ }
}
// GetEstimatedDuration returns the estimated duration
@@ -95,11 +169,115 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration {
return t.estimatedDuration
}
-// ExecuteTask is a wrapper that handles common task execution logic
+// SetProgressCallback sets the progress callback function
+func (t *BaseTask) SetProgressCallback(callback func(float64)) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.progressCallback = callback
+}
+
+// SetLoggerConfig sets the logger configuration for this task
+func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.loggerConfig = config
+}
+
+// GetLogger returns the task logger
+func (t *BaseTask) GetLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// GetTaskLogger returns the task logger (LoggerProvider interface)
+func (t *BaseTask) GetTaskLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// LogInfo logs an info message
+func (t *BaseTask) LogInfo(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Info(message, args...)
+ }
+}
+
+// LogWarning logs a warning message
+func (t *BaseTask) LogWarning(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Warning(message, args...)
+ }
+}
+
+// LogError logs an error message
+func (t *BaseTask) LogError(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Error(message, args...)
+ }
+}
+
+// LogDebug logs a debug message
+func (t *BaseTask) LogDebug(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Debug(message, args...)
+ }
+}
+
+// LogWithFields logs a message with structured fields
+func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) {
+ if t.logger != nil {
+ t.logger.LogWithFields(level, message, fields)
+ }
+}
+
+// FinishTask finalizes the task and closes the logger
+func (t *BaseTask) FinishTask(success bool, errorMsg string) error {
+ if t.logger != nil {
+ if success {
+ t.logger.LogStatus("completed", "Task completed successfully")
+ t.logger.Info("Task %s finished successfully", t.taskID)
+ } else {
+ t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg))
+ t.logger.Error("Task %s failed: %s", t.taskID, errorMsg)
+ }
+
+ // Close logger
+ if err := t.logger.Close(); err != nil {
+ glog.Errorf("Failed to close task logger: %v", err)
+ }
+ }
+
+ return nil
+}
+
+// ExecuteTask is a wrapper that handles common task execution logic with logging
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
+ // Initialize logger if not already done
+ if t.logger == nil {
+ // Generate a temporary task ID if none provided
+ if t.taskID == "" {
+ t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano())
+ }
+
+ workerID := "unknown"
+ if err := t.InitializeLogger(t.taskID, workerID, params); err != nil {
+ glog.Warningf("Failed to initialize task logger: %v", err)
+ }
+ }
+
t.SetStartTime(time.Now())
t.SetProgress(0)
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{
+ "volume_id": params.VolumeID,
+ "server": params.Server,
+ "collection": params.Collection,
+ })
+ }
+
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -114,21 +292,29 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe
// Check cancellation every second
}
}
+ t.LogWarning("Task cancellation detected, cancelling context")
cancel()
}()
// Execute the actual task
+ t.LogInfo("Starting task executor")
err := executor(ctx, params)
if err != nil {
+ t.LogError("Task executor failed: %v", err)
+ t.FinishTask(false, err.Error())
return err
}
if t.IsCancelled() {
+ t.LogWarning("Task was cancelled during execution")
+ t.FinishTask(false, "cancelled")
return context.Canceled
}
t.SetProgress(100)
+ t.LogInfo("Task executor completed successfully")
+ t.FinishTask(true, "")
return nil
}