diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-30 12:38:03 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-30 12:38:03 -0700 |
| commit | 891a2fb6ebc324329f5330a140b8cacff3899db4 (patch) | |
| tree | d02aaa80a909e958aea831f206b3240b0237d7b7 /weed/worker/tasks/task.go | |
| parent | 64198dad8346fe284cbef944fe01ff0d062c147d (diff) | |
| download | seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.tar.xz seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.zip | |
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design
* added simulation as tests
* reorganized the codebase to move the simulation framework and tests into their own dedicated package
* integration test. ec worker task
* remove "enhanced" reference
* start master, volume servers, filer
Current Status
✅ Master: Healthy and running (port 9333)
✅ Filer: Healthy and running (port 8888)
✅ Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready
* generate write load
* tasks are assigned
* admin start wtih grpc port. worker has its own working directory
* Update .gitignore
* working worker and admin. Task detection is not working yet.
* compiles, detection uses volumeSizeLimitMB from master
* compiles
* worker retries connecting to admin
* build and restart
* rendering pending tasks
* skip task ID column
* sticky worker id
* test canScheduleTaskNow
* worker reconnect to admin
* clean up logs
* worker register itself first
* worker can run ec work and report status
but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.
* move ec task logic
* listing ec shards
* local copy, ec. Need to distribute.
* ec is mostly working now
* distribution of ec shards needs improvement
* need configuration to enable ec
* show ec volumes
* interval field UI component
* rename
* integration test with vauuming
* garbage percentage threshold
* fix warning
* display ec shard sizes
* fix ec volumes list
* Update ui.go
* show default values
* ensure correct default value
* MaintenanceConfig use ConfigField
* use schema defined defaults
* config
* reduce duplication
* refactor to use BaseUIProvider
* each task register its schema
* checkECEncodingCandidate use ecDetector
* use vacuumDetector
* use volumeSizeLimitMB
* remove
remove
* remove unused
* refactor
* use new framework
* remove v2 reference
* refactor
* left menu can scroll now
* The maintenance manager was not being initialized when no data directory was configured for persistent storage.
* saving config
* Update task_config_schema_templ.go
* enable/disable tasks
* protobuf encoded task configurations
* fix system settings
* use ui component
* remove logs
* interface{} Reduction
* reduce interface{}
* reduce interface{}
* avoid from/to map
* reduce interface{}
* refactor
* keep it DRY
* added logging
* debug messages
* debug level
* debug
* show the log caller line
* use configured task policy
* log level
* handle admin heartbeat response
* Update worker.go
* fix EC rack and dc count
* Report task status to admin server
* fix task logging, simplify interface checking, use erasure_coding constants
* factor in empty volume server during task planning
* volume.list adds disk id
* track disk id also
* fix locking scheduled and manual scanning
* add active topology
* simplify task detector
* ec task completed, but shards are not showing up
* implement ec in ec_typed.go
* adjust log level
* dedup
* implementing ec copying shards and only ecx files
* use disk id when distributing ec shards
🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned
* Delete original volume from all locations
* clean up existing shard locations
* local encoding and distributing
* Update docker/admin_integration/EC-TESTING-README.md
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* check volume id range
* simplify
* fix tests
* fix types
* clean up logs and tests
---------
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/worker/tasks/task.go')
| -rw-r--r-- | weed/worker/tasks/task.go | 198 |
1 files changed, 192 insertions, 6 deletions
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go index 482233f60..15369c137 100644 --- a/weed/worker/tasks/task.go +++ b/weed/worker/tasks/task.go @@ -2,29 +2,69 @@ package tasks import ( "context" + "fmt" "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) // BaseTask provides common functionality for all tasks type BaseTask struct { taskType types.TaskType + taskID string progress float64 cancelled bool mutex sync.RWMutex startTime time.Time estimatedDuration time.Duration + logger TaskLogger + loggerConfig TaskLoggerConfig + progressCallback func(float64) // Callback function for progress updates } // NewBaseTask creates a new base task func NewBaseTask(taskType types.TaskType) *BaseTask { return &BaseTask{ - taskType: taskType, - progress: 0.0, - cancelled: false, + taskType: taskType, + progress: 0.0, + cancelled: false, + loggerConfig: DefaultTaskLoggerConfig(), + } +} + +// NewBaseTaskWithLogger creates a new base task with custom logger configuration +func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask { + return &BaseTask{ + taskType: taskType, + progress: 0.0, + cancelled: false, + loggerConfig: loggerConfig, + } +} + +// InitializeLogger initializes the task logger with task details +func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error { + return t.InitializeTaskLogger(taskID, workerID, params) +} + +// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface) +func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.taskID = taskID + + logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig) + if err != nil { + return fmt.Errorf("failed to initialize task logger: %w", err) } + + t.logger = logger + t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType) + + return nil } // Type returns the task type @@ -39,24 +79,47 @@ func (t *BaseTask) GetProgress() float64 { return t.progress } -// SetProgress sets the current progress +// SetProgress sets the current progress and logs it func (t *BaseTask) SetProgress(progress float64) { t.mutex.Lock() - defer t.mutex.Unlock() if progress < 0 { progress = 0 } if progress > 100 { progress = 100 } + oldProgress := t.progress + callback := t.progressCallback t.progress = progress + t.mutex.Unlock() + + // Log progress change + if t.logger != nil && progress != oldProgress { + t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress)) + } + + // Call progress callback if set + if callback != nil && progress != oldProgress { + callback(progress) + } } // Cancel cancels the task func (t *BaseTask) Cancel() error { t.mutex.Lock() defer t.mutex.Unlock() + + if t.cancelled { + return nil + } + t.cancelled = true + + if t.logger != nil { + t.logger.LogStatus("cancelled", "Task cancelled by request") + t.logger.Warning("Task %s was cancelled", t.taskID) + } + return nil } @@ -72,6 +135,10 @@ func (t *BaseTask) SetStartTime(startTime time.Time) { t.mutex.Lock() defer t.mutex.Unlock() t.startTime = startTime + + if t.logger != nil { + t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339))) + } } // GetStartTime returns the task start time @@ -86,6 +153,13 @@ func (t *BaseTask) SetEstimatedDuration(duration time.Duration) { t.mutex.Lock() defer t.mutex.Unlock() t.estimatedDuration = duration + + if t.logger != nil { + t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{ + "estimated_duration": duration.String(), + "estimated_seconds": duration.Seconds(), + }) + } } // GetEstimatedDuration returns the estimated duration @@ -95,11 +169,115 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration { return t.estimatedDuration } -// ExecuteTask is a wrapper that handles common task execution logic +// SetProgressCallback sets the progress callback function +func (t *BaseTask) SetProgressCallback(callback func(float64)) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.progressCallback = callback +} + +// SetLoggerConfig sets the logger configuration for this task +func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.loggerConfig = config +} + +// GetLogger returns the task logger +func (t *BaseTask) GetLogger() TaskLogger { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.logger +} + +// GetTaskLogger returns the task logger (LoggerProvider interface) +func (t *BaseTask) GetTaskLogger() TaskLogger { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.logger +} + +// LogInfo logs an info message +func (t *BaseTask) LogInfo(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Info(message, args...) + } +} + +// LogWarning logs a warning message +func (t *BaseTask) LogWarning(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Warning(message, args...) + } +} + +// LogError logs an error message +func (t *BaseTask) LogError(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Error(message, args...) + } +} + +// LogDebug logs a debug message +func (t *BaseTask) LogDebug(message string, args ...interface{}) { + if t.logger != nil { + t.logger.Debug(message, args...) + } +} + +// LogWithFields logs a message with structured fields +func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) { + if t.logger != nil { + t.logger.LogWithFields(level, message, fields) + } +} + +// FinishTask finalizes the task and closes the logger +func (t *BaseTask) FinishTask(success bool, errorMsg string) error { + if t.logger != nil { + if success { + t.logger.LogStatus("completed", "Task completed successfully") + t.logger.Info("Task %s finished successfully", t.taskID) + } else { + t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg)) + t.logger.Error("Task %s failed: %s", t.taskID, errorMsg) + } + + // Close logger + if err := t.logger.Close(); err != nil { + glog.Errorf("Failed to close task logger: %v", err) + } + } + + return nil +} + +// ExecuteTask is a wrapper that handles common task execution logic with logging func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error { + // Initialize logger if not already done + if t.logger == nil { + // Generate a temporary task ID if none provided + if t.taskID == "" { + t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano()) + } + + workerID := "unknown" + if err := t.InitializeLogger(t.taskID, workerID, params); err != nil { + glog.Warningf("Failed to initialize task logger: %v", err) + } + } + t.SetStartTime(time.Now()) t.SetProgress(0) + if t.logger != nil { + t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{ + "volume_id": params.VolumeID, + "server": params.Server, + "collection": params.Collection, + }) + } + // Create a context that can be cancelled ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -114,21 +292,29 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe // Check cancellation every second } } + t.LogWarning("Task cancellation detected, cancelling context") cancel() }() // Execute the actual task + t.LogInfo("Starting task executor") err := executor(ctx, params) if err != nil { + t.LogError("Task executor failed: %v", err) + t.FinishTask(false, err.Error()) return err } if t.IsCancelled() { + t.LogWarning("Task was cancelled during execution") + t.FinishTask(false, "cancelled") return context.Canceled } t.SetProgress(100) + t.LogInfo("Task executor completed successfully") + t.FinishTask(true, "") return nil } |
