aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/worker.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-01 11:18:32 -0700
committerGitHub <noreply@github.com>2025-08-01 11:18:32 -0700
commit0975968e71b05368d5f28f788cf863c2042c2696 (patch)
tree5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/worker.go
parent1cba609bfa2306cc2885df212febd5ff954aa693 (diff)
downloadseaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz
seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip
admin: Refactor task destination planning (#7063)
* refactor planning into task detection * refactoring worker tasks * refactor * compiles, but only balance task is registered * compiles, but has nil exception * avoid nil logger * add back ec task * setting ec log directory * implement balance and vacuum tasks * EC tasks will no longer fail with "file not found" errors * Use ReceiveFile API to send locally generated shards * distributing shard files and ecx,ecj,vif files * generate .ecx files correctly * do not mount all possible EC shards (0-13) on every destination * use constants * delete all replicas * rename files * pass in volume size to tasks
Diffstat (limited to 'weed/worker/worker.go')
-rw-r--r--weed/worker/worker.go89
1 files changed, 41 insertions, 48 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go
index ff6b87808..2bc0e1e11 100644
--- a/weed/worker/worker.go
+++ b/weed/worker/worker.go
@@ -1,6 +1,7 @@
package worker
import (
+ "context"
"crypto/rand"
"fmt"
"net"
@@ -26,7 +27,7 @@ type Worker struct {
id string
config *types.WorkerConfig
registry *tasks.TaskRegistry
- currentTasks map[string]*types.Task
+ currentTasks map[string]*types.TaskInput
adminClient AdminClient
running bool
stopChan chan struct{}
@@ -43,9 +44,9 @@ type Worker struct {
type AdminClient interface {
Connect() error
Disconnect() error
- RegisterWorker(worker *types.Worker) error
+ RegisterWorker(worker *types.WorkerData) error
SendHeartbeat(workerID string, status *types.WorkerStatus) error
- RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error)
+ RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error)
CompleteTask(taskID string, success bool, errorMsg string) error
CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error
UpdateTaskProgress(taskID string, progress float64) error
@@ -139,8 +140,8 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) {
return nil, fmt.Errorf("failed to generate or load worker ID: %w", err)
}
- // Use the global registry that already has all tasks registered
- registry := tasks.GetGlobalRegistry()
+ // Use the global unified registry that already has all tasks registered
+ registry := tasks.GetGlobalTaskRegistry()
// Initialize task log handler
logDir := filepath.Join(config.BaseWorkingDir, "task_logs")
@@ -150,13 +151,13 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) {
id: workerID,
config: config,
registry: registry,
- currentTasks: make(map[string]*types.Task),
+ currentTasks: make(map[string]*types.TaskInput),
stopChan: make(chan struct{}),
startTime: time.Now(),
taskLogHandler: taskLogHandler,
}
- glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetSupportedTypes()))
+ glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll()))
return worker, nil
}
@@ -194,7 +195,7 @@ func (w *Worker) Start() error {
w.startTime = time.Now()
// Prepare worker info for registration
- workerInfo := &types.Worker{
+ workerInfo := &types.WorkerData{
ID: w.id,
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
@@ -293,7 +294,7 @@ func (w *Worker) GetStatus() types.WorkerStatus {
w.mutex.RLock()
defer w.mutex.RUnlock()
- var currentTasks []types.Task
+ var currentTasks []types.TaskInput
for _, task := range w.currentTasks {
currentTasks = append(currentTasks, *task)
}
@@ -318,7 +319,7 @@ func (w *Worker) GetStatus() types.WorkerStatus {
}
// HandleTask handles a task execution
-func (w *Worker) HandleTask(task *types.Task) error {
+func (w *Worker) HandleTask(task *types.TaskInput) error {
glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)",
w.id, task.ID, task.Type, task.VolumeID)
@@ -370,7 +371,7 @@ func (w *Worker) SetAdminClient(client AdminClient) {
}
// executeTask executes a task
-func (w *Worker) executeTask(task *types.Task) {
+func (w *Worker) executeTask(task *types.TaskInput) {
startTime := time.Now()
defer func() {
@@ -403,44 +404,35 @@ func (w *Worker) executeTask(task *types.Task) {
return
}
- // Use typed task execution (all tasks should be typed)
+ // Use new task execution system with unified Task interface
glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID)
- typedRegistry := types.GetGlobalTypedTaskRegistry()
- typedTaskInstance, err := typedRegistry.CreateTypedTask(task.Type)
- if err != nil {
- w.completeTask(task.ID, false, fmt.Sprintf("typed task not available for %s: %v", task.Type, err))
- glog.Errorf("Worker %s failed to create typed task %s: %v", w.id, task.ID, err)
- return
- }
+ taskFactory := w.registry.Get(task.Type)
+ if taskFactory == nil {
+ w.completeTask(task.ID, false, fmt.Sprintf("task factory not available for %s: task type not found", task.Type))
+ glog.Errorf("Worker %s failed to get task factory for %s type %v", w.id, task.ID, task.Type)
- // Configure task logger directory (all typed tasks support this)
- tasksLoggerConfig := w.getTaskLoggerConfig()
- typedLoggerConfig := types.TaskLoggerConfig{
- BaseLogDir: tasksLoggerConfig.BaseLogDir,
- MaxTasks: tasksLoggerConfig.MaxTasks,
- MaxLogSizeMB: tasksLoggerConfig.MaxLogSizeMB,
- EnableConsole: tasksLoggerConfig.EnableConsole,
+ // Log supported task types for debugging
+ allFactories := w.registry.GetAll()
+ glog.Errorf("Available task types: %d", len(allFactories))
+ for taskType := range allFactories {
+ glog.Errorf("Supported task type: %v", taskType)
+ }
+ return
}
- typedTaskInstance.SetLoggerConfig(typedLoggerConfig)
- glog.V(2).Infof("Set typed task logger config for %s: %s", task.ID, typedLoggerConfig.BaseLogDir)
- // Initialize logging (all typed tasks support this)
- taskParams := types.TaskParams{
- VolumeID: task.VolumeID,
- Server: task.Server,
- Collection: task.Collection,
- WorkingDir: taskWorkingDir,
- TypedParams: task.TypedParams,
- GrpcDialOption: w.config.GrpcDialOption,
+ taskInstance, err := taskFactory.Create(task.TypedParams)
+ if err != nil {
+ w.completeTask(task.ID, false, fmt.Sprintf("failed to create task for %s: %v", task.Type, err))
+ glog.Errorf("Worker %s failed to create task %s type %v: %v", w.id, task.ID, task.Type, err)
+ return
}
- if err := typedTaskInstance.InitializeTaskLogger(task.ID, w.id, taskParams); err != nil {
- glog.Warningf("Failed to initialize task logger for %s: %v", task.ID, err)
- }
+ // Task execution uses the new unified Task interface
+ glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir)
// Set progress callback that reports to admin server
- typedTaskInstance.SetProgressCallback(func(progress float64) {
+ taskInstance.SetProgressCallback(func(progress float64) {
// Report progress updates to admin server
glog.V(2).Infof("Task %s progress: %.1f%%", task.ID, progress)
if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil {
@@ -448,18 +440,19 @@ func (w *Worker) executeTask(task *types.Task) {
}
})
- // Execute typed task
- err = typedTaskInstance.ExecuteTyped(task.TypedParams)
+ // Execute task with context
+ ctx := context.Background()
+ err = taskInstance.Execute(ctx, task.TypedParams)
// Report completion
if err != nil {
w.completeTask(task.ID, false, err.Error())
w.tasksFailed++
- glog.Errorf("Worker %s failed to execute typed task %s: %v", w.id, task.ID, err)
+ glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
} else {
w.completeTask(task.ID, true, "")
w.tasksCompleted++
- glog.Infof("Worker %s completed typed task %s successfully", w.id, task.ID)
+ glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)
}
}
@@ -558,11 +551,11 @@ func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry {
}
// GetCurrentTasks returns the current tasks
-func (w *Worker) GetCurrentTasks() map[string]*types.Task {
+func (w *Worker) GetCurrentTasks() map[string]*types.TaskInput {
w.mutex.RLock()
defer w.mutex.RUnlock()
- tasks := make(map[string]*types.Task)
+ tasks := make(map[string]*types.TaskInput)
for id, task := range w.currentTasks {
tasks[id] = task
}
@@ -571,7 +564,7 @@ func (w *Worker) GetCurrentTasks() map[string]*types.Task {
// registerWorker registers the worker with the admin server
func (w *Worker) registerWorker() {
- workerInfo := &types.Worker{
+ workerInfo := &types.WorkerData{
ID: w.id,
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
@@ -698,7 +691,7 @@ func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
// Convert to task and handle it
- task := &types.Task{
+ task := &types.TaskInput{
ID: taskAssign.TaskId,
Type: types.TaskType(taskAssign.TaskType),
Status: types.TaskStatusAssigned,