diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-09 21:47:29 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-09 21:47:29 -0700 |
| commit | 25bbf4c3d44b1c8a9aa4980e37ed399ec249f771 (patch) | |
| tree | aabb2ce3c6f55e4cf1e26ce2b6989086c17830bc /weed/worker/tasks/task.go | |
| parent | 3ac2a2e22d863753a6b568596fbe9d76d03023b5 (diff) | |
| download | seaweedfs-25bbf4c3d44b1c8a9aa4980e37ed399ec249f771.tar.xz seaweedfs-25bbf4c3d44b1c8a9aa4980e37ed399ec249f771.zip | |
Admin UI: Fetch task logs (#7114)
* show task details
* loading tasks
* task UI works
* generic rendering
* rendering the export link
* removing placementConflicts from task parameters
* remove TaskSourceLocation
* remove "Server ID" column
* rendering balance task source
* sources and targets
* fix ec task generation
* move info
* render timeline
* simplified worker id
* simplify
* read task logs from worker
* isValidTaskID
* address comments
* Update weed/worker/tasks/balance/execution.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/worker/tasks/erasure_coding/ec_task.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/worker/tasks/task_log_handler.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix shard ids
* plan distributing shard id
* rendering planned shards in task details
* remove Conflicts
* worker logs correctly
* pass in dc and rack
* task logging
* Update weed/admin/maintenance/maintenance_queue.go
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* display log details
* logs have fields now
* sort field keys
* fix link
* fix collection filtering
* avoid hard coded ec shard counts
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/worker/tasks/task.go')
| -rw-r--r-- | weed/worker/tasks/task.go | 66 |
1 files changed, 60 insertions, 6 deletions
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go index 9813ae97f..f3eed8b2d 100644 --- a/weed/worker/tasks/task.go +++ b/weed/worker/tasks/task.go @@ -7,6 +7,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -21,7 +22,8 @@ type BaseTask struct { estimatedDuration time.Duration logger TaskLogger loggerConfig TaskLoggerConfig - progressCallback func(float64) // Callback function for progress updates + progressCallback func(float64, string) // Callback function for progress updates + currentStage string // Current stage description } // NewBaseTask creates a new base task @@ -90,20 +92,64 @@ func (t *BaseTask) SetProgress(progress float64) { } oldProgress := t.progress callback := t.progressCallback + stage := t.currentStage t.progress = progress t.mutex.Unlock() // Log progress change if t.logger != nil && progress != oldProgress { - t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress)) + message := stage + if message == "" { + message = fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress) + } + t.logger.LogProgress(progress, message) } // Call progress callback if set if callback != nil && progress != oldProgress { - callback(progress) + callback(progress, stage) } } +// SetProgressWithStage sets the current progress with a stage description +func (t *BaseTask) SetProgressWithStage(progress float64, stage string) { + t.mutex.Lock() + if progress < 0 { + progress = 0 + } + if progress > 100 { + progress = 100 + } + callback := t.progressCallback + t.progress = progress + t.currentStage = stage + t.mutex.Unlock() + + // Log progress change + if t.logger != nil { + t.logger.LogProgress(progress, stage) + } + + // Call progress callback if set + if callback != nil { + callback(progress, stage) + } +} + +// SetCurrentStage sets the current stage description +func (t *BaseTask) SetCurrentStage(stage string) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.currentStage = stage +} + +// GetCurrentStage returns the current stage description +func (t *BaseTask) GetCurrentStage() string { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.currentStage +} + // Cancel cancels the task func (t *BaseTask) Cancel() error { t.mutex.Lock() @@ -170,7 +216,7 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration { } // SetProgressCallback sets the progress callback function -func (t *BaseTask) SetProgressCallback(callback func(float64)) { +func (t *BaseTask) SetProgressCallback(callback func(float64, string)) { t.mutex.Lock() defer t.mutex.Unlock() t.progressCallback = callback @@ -273,7 +319,7 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe if t.logger != nil { t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{ "volume_id": params.VolumeID, - "server": params.Server, + "server": getServerFromSources(params.TypedParams.Sources), "collection": params.Collection, }) } @@ -362,7 +408,7 @@ func ValidateParams(params types.TaskParams, requiredFields ...string) error { return &ValidationError{Field: field, Message: "volume_id is required"} } case "server": - if params.Server == "" { + if len(params.TypedParams.Sources) == 0 { return &ValidationError{Field: field, Message: "server is required"} } case "collection": @@ -383,3 +429,11 @@ type ValidationError struct { func (e *ValidationError) Error() string { return e.Field + ": " + e.Message } + +// getServerFromSources extracts the server address from unified sources +func getServerFromSources(sources []*worker_pb.TaskSource) string { + if len(sources) > 0 { + return sources[0].Node + } + return "" +} |
