aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/task.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-30 12:38:03 -0700
committerGitHub <noreply@github.com>2025-07-30 12:38:03 -0700
commit891a2fb6ebc324329f5330a140b8cacff3899db4 (patch)
treed02aaa80a909e958aea831f206b3240b0237d7b7 /weed/worker/tasks/task.go
parent64198dad8346fe284cbef944fe01ff0d062c147d (diff)
downloadseaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.tar.xz
seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.zip
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/worker/tasks/task.go')
-rw-r--r--weed/worker/tasks/task.go198
1 files changed, 192 insertions, 6 deletions
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go
index 482233f60..15369c137 100644
--- a/weed/worker/tasks/task.go
+++ b/weed/worker/tasks/task.go
@@ -2,29 +2,69 @@ package tasks
import (
"context"
+ "fmt"
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BaseTask provides common functionality for all tasks
type BaseTask struct {
taskType types.TaskType
+ taskID string
progress float64
cancelled bool
mutex sync.RWMutex
startTime time.Time
estimatedDuration time.Duration
+ logger TaskLogger
+ loggerConfig TaskLoggerConfig
+ progressCallback func(float64) // Callback function for progress updates
}
// NewBaseTask creates a new base task
func NewBaseTask(taskType types.TaskType) *BaseTask {
return &BaseTask{
- taskType: taskType,
- progress: 0.0,
- cancelled: false,
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: DefaultTaskLoggerConfig(),
+ }
+}
+
+// NewBaseTaskWithLogger creates a new base task with custom logger configuration
+func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask {
+ return &BaseTask{
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: loggerConfig,
+ }
+}
+
+// InitializeLogger initializes the task logger with task details
+func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error {
+ return t.InitializeTaskLogger(taskID, workerID, params)
+}
+
+// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
+func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ t.taskID = taskID
+
+ logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig)
+ if err != nil {
+ return fmt.Errorf("failed to initialize task logger: %w", err)
}
+
+ t.logger = logger
+ t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType)
+
+ return nil
}
// Type returns the task type
@@ -39,24 +79,47 @@ func (t *BaseTask) GetProgress() float64 {
return t.progress
}
-// SetProgress sets the current progress
+// SetProgress sets the current progress and logs it
func (t *BaseTask) SetProgress(progress float64) {
t.mutex.Lock()
- defer t.mutex.Unlock()
if progress < 0 {
progress = 0
}
if progress > 100 {
progress = 100
}
+ oldProgress := t.progress
+ callback := t.progressCallback
t.progress = progress
+ t.mutex.Unlock()
+
+ // Log progress change
+ if t.logger != nil && progress != oldProgress {
+ t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress))
+ }
+
+ // Call progress callback if set
+ if callback != nil && progress != oldProgress {
+ callback(progress)
+ }
}
// Cancel cancels the task
func (t *BaseTask) Cancel() error {
t.mutex.Lock()
defer t.mutex.Unlock()
+
+ if t.cancelled {
+ return nil
+ }
+
t.cancelled = true
+
+ if t.logger != nil {
+ t.logger.LogStatus("cancelled", "Task cancelled by request")
+ t.logger.Warning("Task %s was cancelled", t.taskID)
+ }
+
return nil
}
@@ -72,6 +135,10 @@ func (t *BaseTask) SetStartTime(startTime time.Time) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.startTime = startTime
+
+ if t.logger != nil {
+ t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339)))
+ }
}
// GetStartTime returns the task start time
@@ -86,6 +153,13 @@ func (t *BaseTask) SetEstimatedDuration(duration time.Duration) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.estimatedDuration = duration
+
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{
+ "estimated_duration": duration.String(),
+ "estimated_seconds": duration.Seconds(),
+ })
+ }
}
// GetEstimatedDuration returns the estimated duration
@@ -95,11 +169,115 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration {
return t.estimatedDuration
}
-// ExecuteTask is a wrapper that handles common task execution logic
+// SetProgressCallback sets the progress callback function
+func (t *BaseTask) SetProgressCallback(callback func(float64)) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.progressCallback = callback
+}
+
+// SetLoggerConfig sets the logger configuration for this task
+func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.loggerConfig = config
+}
+
+// GetLogger returns the task logger
+func (t *BaseTask) GetLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// GetTaskLogger returns the task logger (LoggerProvider interface)
+func (t *BaseTask) GetTaskLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// LogInfo logs an info message
+func (t *BaseTask) LogInfo(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Info(message, args...)
+ }
+}
+
+// LogWarning logs a warning message
+func (t *BaseTask) LogWarning(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Warning(message, args...)
+ }
+}
+
+// LogError logs an error message
+func (t *BaseTask) LogError(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Error(message, args...)
+ }
+}
+
+// LogDebug logs a debug message
+func (t *BaseTask) LogDebug(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Debug(message, args...)
+ }
+}
+
+// LogWithFields logs a message with structured fields
+func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) {
+ if t.logger != nil {
+ t.logger.LogWithFields(level, message, fields)
+ }
+}
+
+// FinishTask finalizes the task and closes the logger
+func (t *BaseTask) FinishTask(success bool, errorMsg string) error {
+ if t.logger != nil {
+ if success {
+ t.logger.LogStatus("completed", "Task completed successfully")
+ t.logger.Info("Task %s finished successfully", t.taskID)
+ } else {
+ t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg))
+ t.logger.Error("Task %s failed: %s", t.taskID, errorMsg)
+ }
+
+ // Close logger
+ if err := t.logger.Close(); err != nil {
+ glog.Errorf("Failed to close task logger: %v", err)
+ }
+ }
+
+ return nil
+}
+
+// ExecuteTask is a wrapper that handles common task execution logic with logging
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
+ // Initialize logger if not already done
+ if t.logger == nil {
+ // Generate a temporary task ID if none provided
+ if t.taskID == "" {
+ t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano())
+ }
+
+ workerID := "unknown"
+ if err := t.InitializeLogger(t.taskID, workerID, params); err != nil {
+ glog.Warningf("Failed to initialize task logger: %v", err)
+ }
+ }
+
t.SetStartTime(time.Now())
t.SetProgress(0)
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{
+ "volume_id": params.VolumeID,
+ "server": params.Server,
+ "collection": params.Collection,
+ })
+ }
+
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -114,21 +292,29 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe
// Check cancellation every second
}
}
+ t.LogWarning("Task cancellation detected, cancelling context")
cancel()
}()
// Execute the actual task
+ t.LogInfo("Starting task executor")
err := executor(ctx, params)
if err != nil {
+ t.LogError("Task executor failed: %v", err)
+ t.FinishTask(false, err.Error())
return err
}
if t.IsCancelled() {
+ t.LogWarning("Task was cancelled during execution")
+ t.FinishTask(false, "cancelled")
return context.Canceled
}
t.SetProgress(100)
+ t.LogInfo("Task executor completed successfully")
+ t.FinishTask(true, "")
return nil
}