aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/task.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-07-27 11:04:51 -0700
committerchrislu <chris.lu@gmail.com>2025-07-27 11:04:51 -0700
commit1b64a0f0344c4c1066833a6d25b20dca9bb171e5 (patch)
tree9b8d8600b7b9d8fc43127cb2878b3dc625a60427 /weed/worker/tasks/task.go
parent9025b722410c605181b094fbeeed66de002fda27 (diff)
downloadseaweedfs-origin/worker-execute-ec-tasks.tar.xz
seaweedfs-origin/worker-execute-ec-tasks.zip
add generic logging and implement it for balancing taskorigin/worker-execute-ec-tasks
Diffstat (limited to 'weed/worker/tasks/task.go')
-rw-r--r--weed/worker/tasks/task.go105
1 files changed, 105 insertions, 0 deletions
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go
index 482233f60..ceeb5f413 100644
--- a/weed/worker/tasks/task.go
+++ b/weed/worker/tasks/task.go
@@ -2,6 +2,10 @@ package tasks
import (
"context"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
"sync"
"time"
@@ -16,6 +20,11 @@ type BaseTask struct {
mutex sync.RWMutex
startTime time.Time
estimatedDuration time.Duration
+
+ // Logging functionality
+ logFile *os.File
+ logger *log.Logger
+ logFilePath string
}
// NewBaseTask creates a new base task
@@ -95,6 +104,16 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration {
return t.estimatedDuration
}
+// InitializeTaskLogging sets up task-specific logging - can be called by worker or tasks
+func (t *BaseTask) InitializeTaskLogging(workingDir, taskID string) error {
+ return t.initializeLogging(workingDir, taskID)
+}
+
+// CloseTaskLogging properly closes task logging - can be called by worker or tasks
+func (t *BaseTask) CloseTaskLogging() {
+ t.closeLogging()
+}
+
// ExecuteTask is a wrapper that handles common task execution logic
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
t.SetStartTime(time.Now())
@@ -132,6 +151,92 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe
return nil
}
+// initializeLogging sets up task-specific logging to a file in the working directory
+func (t *BaseTask) initializeLogging(workingDir, taskID string) error {
+ if workingDir == "" {
+ // If no working directory specified, skip file logging
+ return nil
+ }
+
+ // Ensure working directory exists
+ if err := os.MkdirAll(workingDir, 0755); err != nil {
+ return fmt.Errorf("failed to create working directory %s: %v", workingDir, err)
+ }
+
+ // Create task-specific log file
+ timestamp := time.Now().Format("20060102_150405")
+ logFileName := fmt.Sprintf("%s_%s_%s.log", t.taskType, taskID, timestamp)
+ t.logFilePath = filepath.Join(workingDir, logFileName)
+
+ logFile, err := os.OpenFile(t.logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
+ if err != nil {
+ return fmt.Errorf("failed to create log file %s: %v", t.logFilePath, err)
+ }
+
+ t.logFile = logFile
+ t.logger = log.New(logFile, "", log.LstdFlags|log.Lmicroseconds)
+
+ // Log task initialization
+ t.LogInfo("Task %s initialized for %s", taskID, t.taskType)
+
+ return nil
+}
+
+// closeLogging properly closes the log file
+func (t *BaseTask) closeLogging() {
+ if t.logFile != nil {
+ t.LogInfo("Task completed, closing log file")
+ t.logFile.Close()
+ t.logFile = nil
+ t.logger = nil
+ }
+}
+
+// LogInfo writes an info-level log message to both glog and task log file
+func (t *BaseTask) LogInfo(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[INFO] %s", message)
+ }
+}
+
+// LogError writes an error-level log message to both glog and task log file
+func (t *BaseTask) LogError(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[ERROR] %s", message)
+ }
+}
+
+// LogDebug writes a debug-level log message to task log file
+func (t *BaseTask) LogDebug(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[DEBUG] %s", message)
+ }
+}
+
+// LogWarning writes a warning-level log message to both glog and task log file
+func (t *BaseTask) LogWarning(format string, args ...interface{}) {
+ message := fmt.Sprintf(format, args...)
+
+ // Always log to task file if available
+ if t.logger != nil {
+ t.logger.Printf("[WARNING] %s", message)
+ }
+}
+
+// GetLogFilePath returns the path to the task's log file
+func (t *BaseTask) GetLogFilePath() string {
+ return t.logFilePath
+}
+
// TaskRegistry manages task factories
type TaskRegistry struct {
factories map[types.TaskType]types.TaskFactory