diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/worker.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/worker/worker.go')
| -rw-r--r-- | weed/worker/worker.go | 89 |
1 files changed, 41 insertions, 48 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go index ff6b87808..2bc0e1e11 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -1,6 +1,7 @@ package worker import ( + "context" "crypto/rand" "fmt" "net" @@ -26,7 +27,7 @@ type Worker struct { id string config *types.WorkerConfig registry *tasks.TaskRegistry - currentTasks map[string]*types.Task + currentTasks map[string]*types.TaskInput adminClient AdminClient running bool stopChan chan struct{} @@ -43,9 +44,9 @@ type Worker struct { type AdminClient interface { Connect() error Disconnect() error - RegisterWorker(worker *types.Worker) error + RegisterWorker(worker *types.WorkerData) error SendHeartbeat(workerID string, status *types.WorkerStatus) error - RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) + RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) CompleteTask(taskID string, success bool, errorMsg string) error CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error UpdateTaskProgress(taskID string, progress float64) error @@ -139,8 +140,8 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { return nil, fmt.Errorf("failed to generate or load worker ID: %w", err) } - // Use the global registry that already has all tasks registered - registry := tasks.GetGlobalRegistry() + // Use the global unified registry that already has all tasks registered + registry := tasks.GetGlobalTaskRegistry() // Initialize task log handler logDir := filepath.Join(config.BaseWorkingDir, "task_logs") @@ -150,13 +151,13 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { id: workerID, config: config, registry: registry, - currentTasks: make(map[string]*types.Task), + currentTasks: make(map[string]*types.TaskInput), stopChan: make(chan struct{}), startTime: time.Now(), taskLogHandler: taskLogHandler, } - glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetSupportedTypes())) + glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll())) return worker, nil } @@ -194,7 +195,7 @@ func (w *Worker) Start() error { w.startTime = time.Now() // Prepare worker info for registration - workerInfo := &types.Worker{ + workerInfo := &types.WorkerData{ ID: w.id, Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, @@ -293,7 +294,7 @@ func (w *Worker) GetStatus() types.WorkerStatus { w.mutex.RLock() defer w.mutex.RUnlock() - var currentTasks []types.Task + var currentTasks []types.TaskInput for _, task := range w.currentTasks { currentTasks = append(currentTasks, *task) } @@ -318,7 +319,7 @@ func (w *Worker) GetStatus() types.WorkerStatus { } // HandleTask handles a task execution -func (w *Worker) HandleTask(task *types.Task) error { +func (w *Worker) HandleTask(task *types.TaskInput) error { glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)", w.id, task.ID, task.Type, task.VolumeID) @@ -370,7 +371,7 @@ func (w *Worker) SetAdminClient(client AdminClient) { } // executeTask executes a task -func (w *Worker) executeTask(task *types.Task) { +func (w *Worker) executeTask(task *types.TaskInput) { startTime := time.Now() defer func() { @@ -403,44 +404,35 @@ func (w *Worker) executeTask(task *types.Task) { return } - // Use typed task execution (all tasks should be typed) + // Use new task execution system with unified Task interface glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID) - typedRegistry := types.GetGlobalTypedTaskRegistry() - typedTaskInstance, err := typedRegistry.CreateTypedTask(task.Type) - if err != nil { - w.completeTask(task.ID, false, fmt.Sprintf("typed task not available for %s: %v", task.Type, err)) - glog.Errorf("Worker %s failed to create typed task %s: %v", w.id, task.ID, err) - return - } + taskFactory := w.registry.Get(task.Type) + if taskFactory == nil { + w.completeTask(task.ID, false, fmt.Sprintf("task factory not available for %s: task type not found", task.Type)) + glog.Errorf("Worker %s failed to get task factory for %s type %v", w.id, task.ID, task.Type) - // Configure task logger directory (all typed tasks support this) - tasksLoggerConfig := w.getTaskLoggerConfig() - typedLoggerConfig := types.TaskLoggerConfig{ - BaseLogDir: tasksLoggerConfig.BaseLogDir, - MaxTasks: tasksLoggerConfig.MaxTasks, - MaxLogSizeMB: tasksLoggerConfig.MaxLogSizeMB, - EnableConsole: tasksLoggerConfig.EnableConsole, + // Log supported task types for debugging + allFactories := w.registry.GetAll() + glog.Errorf("Available task types: %d", len(allFactories)) + for taskType := range allFactories { + glog.Errorf("Supported task type: %v", taskType) + } + return } - typedTaskInstance.SetLoggerConfig(typedLoggerConfig) - glog.V(2).Infof("Set typed task logger config for %s: %s", task.ID, typedLoggerConfig.BaseLogDir) - // Initialize logging (all typed tasks support this) - taskParams := types.TaskParams{ - VolumeID: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - WorkingDir: taskWorkingDir, - TypedParams: task.TypedParams, - GrpcDialOption: w.config.GrpcDialOption, + taskInstance, err := taskFactory.Create(task.TypedParams) + if err != nil { + w.completeTask(task.ID, false, fmt.Sprintf("failed to create task for %s: %v", task.Type, err)) + glog.Errorf("Worker %s failed to create task %s type %v: %v", w.id, task.ID, task.Type, err) + return } - if err := typedTaskInstance.InitializeTaskLogger(task.ID, w.id, taskParams); err != nil { - glog.Warningf("Failed to initialize task logger for %s: %v", task.ID, err) - } + // Task execution uses the new unified Task interface + glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir) // Set progress callback that reports to admin server - typedTaskInstance.SetProgressCallback(func(progress float64) { + taskInstance.SetProgressCallback(func(progress float64) { // Report progress updates to admin server glog.V(2).Infof("Task %s progress: %.1f%%", task.ID, progress) if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil { @@ -448,18 +440,19 @@ func (w *Worker) executeTask(task *types.Task) { } }) - // Execute typed task - err = typedTaskInstance.ExecuteTyped(task.TypedParams) + // Execute task with context + ctx := context.Background() + err = taskInstance.Execute(ctx, task.TypedParams) // Report completion if err != nil { w.completeTask(task.ID, false, err.Error()) w.tasksFailed++ - glog.Errorf("Worker %s failed to execute typed task %s: %v", w.id, task.ID, err) + glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) } else { w.completeTask(task.ID, true, "") w.tasksCompleted++ - glog.Infof("Worker %s completed typed task %s successfully", w.id, task.ID) + glog.Infof("Worker %s completed task %s successfully", w.id, task.ID) } } @@ -558,11 +551,11 @@ func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry { } // GetCurrentTasks returns the current tasks -func (w *Worker) GetCurrentTasks() map[string]*types.Task { +func (w *Worker) GetCurrentTasks() map[string]*types.TaskInput { w.mutex.RLock() defer w.mutex.RUnlock() - tasks := make(map[string]*types.Task) + tasks := make(map[string]*types.TaskInput) for id, task := range w.currentTasks { tasks[id] = task } @@ -571,7 +564,7 @@ func (w *Worker) GetCurrentTasks() map[string]*types.Task { // registerWorker registers the worker with the admin server func (w *Worker) registerWorker() { - workerInfo := &types.Worker{ + workerInfo := &types.WorkerData{ ID: w.id, Capabilities: w.config.Capabilities, MaxConcurrent: w.config.MaxConcurrent, @@ -698,7 +691,7 @@ func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) { w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId) // Convert to task and handle it - task := &types.Task{ + task := &types.TaskInput{ ID: taskAssign.TaskId, Type: types.TaskType(taskAssign.TaskType), Status: types.TaskStatusAssigned, |
