aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks')
-rw-r--r--weed/worker/tasks/balance/balance.go8
-rw-r--r--weed/worker/tasks/task.go105
2 files changed, 112 insertions, 1 deletions
diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go
index ea867d950..657c32df4 100644
--- a/weed/worker/tasks/balance/balance.go
+++ b/weed/worker/tasks/balance/balance.go
@@ -30,6 +30,7 @@ func NewTask(server string, volumeID uint32, collection string) *Task {
// Execute executes the balance task
func (t *Task) Execute(params types.TaskParams) error {
+ t.LogInfo("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection)
glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection)
// Simulate balance operation with progress updates
@@ -45,18 +46,23 @@ func (t *Task) Execute(params types.TaskParams) error {
{"Verifying balance", 1 * time.Second, 100},
}
- for _, step := range steps {
+ for i, step := range steps {
if t.IsCancelled() {
+ t.LogWarning("Balance task cancelled at step %d: %s", i+1, step.name)
return fmt.Errorf("balance task cancelled")
}
+ t.LogInfo("Starting step %d/%d: %s", i+1, len(steps), step.name)
glog.V(1).Infof("Balance task step: %s", step.name)
t.SetProgress(step.progress)
// Simulate work
time.Sleep(step.duration)
+
+ t.LogDebug("Completed step %d/%d: %s (progress: %.1f%%)", i+1, len(steps), step.name, step.progress)
}
+ t.LogInfo("Balance task completed successfully for volume %d on server %s", t.volumeID, t.server)
glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server)
return nil
}
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go
index 482233f60..ceeb5f413 100644
--- a/weed/worker/tasks/task.go
+++ b/weed/worker/tasks/task.go
@@ -2,6 +2,10 @@ package tasks
import (
"context"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
"sync"
"time"
@@ -16,6 +20,11 @@ type BaseTask struct {
mutex sync.RWMutex
startTime time.Time
estimatedDuration time.Duration
+
+ // Logging functionality
+ logFile *os.File
+ logger *log.Logger
+ logFilePath string
}
// NewBaseTask creates a new base task
@@ -95,6 +104,16 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration {
return t.estimatedDuration
}
+// InitializeTaskLogging sets up task-specific logging - can be called by worker or tasks
+func (t *BaseTask) InitializeTaskLogging(workingDir, taskID string) error {
+ return t.initializeLogging(workingDir, taskID)
+}
+
+// CloseTaskLogging properly closes task logging - can be called by worker or tasks
+func (t *BaseTask) CloseTaskLogging() {
+ t.closeLogging()
+}
+
// ExecuteTask is a wrapper that handles common task execution logic
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
t.SetStartTime(time.Now())
@@ -132,6 +151,92 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe
return nil
}
+// initializeLogging sets up task-specific logging to a file in the working directory
+func (t *BaseTask) initializeLogging(workingDir, taskID string) error {
+ if workingDir == "" {
+ // If no working directory specified, skip file logging
+ return nil
+ }
+
+ // Ensure working directory exists
+ if err := os.MkdirAll(workingDir, 0755); err != nil {
+ return fmt.Errorf("failed to create working directory %s: %v", workingDir, err)
+ }
+
+ // Create task-specific log file
+ timestamp := time.Now().Format("20060102_150405")
+ logFileName := fmt.Sprintf("%s_%s_%s.log", t.taskType, taskID, timestamp)
+ t.logFilePath = filepath.Join(workingDir, logFileName)
+
+ logFile, err := os.OpenFile(t.logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
+ if err != nil {
+ return fmt.Errorf("failed to create log file %s: %v", t.logFilePath, err)
+ }
+
+ t.logFile = logFile
+ t.logger = log.New(logFile, "", log.LstdFlags|log.Lmicroseconds)
+
+ // Log task initialization
+ t.LogInfo("Task %s initialized for %s", taskID, t.taskType)
+
+ return nil
+}
+
+// closeLogging properly closes the log file
+func (t *BaseTask) closeLogging() {
+ if t.logFile != nil {
+ t.LogInfo("Task completed, closing log file")
+ t.logFile.Close()
+ t.logFile = nil
+ t.logger = nil
+ }
+}
+
+// LogInfo writes an info-level log message to both glog and task log file
+func (t *BaseTask) LogInfo(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[INFO] %s", message)
+ }
+}
+
+// LogError writes an error-level log message to both glog and task log file
+func (t *BaseTask) LogError(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[ERROR] %s", message)
+ }
+}
+
+// LogDebug writes a debug-level log message to task log file
+func (t *BaseTask) LogDebug(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[DEBUG] %s", message)
+ }
+}
+
+// LogWarning writes a warning-level log message to both glog and task log file
+func (t *BaseTask) LogWarning(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[WARNING] %s", message)
+ }
+}
+
+// GetLogFilePath returns the path to the task's log file
+func (t *BaseTask) GetLogFilePath() string {
+ return t.logFilePath
+}
+
// TaskRegistry manages task factories
type TaskRegistry struct {
factories map[types.TaskType]types.TaskFactory