diff options
Diffstat (limited to 'weed/worker/worker.go')
| -rw-r--r-- | weed/worker/worker.go | 466 |
1 files changed, 430 insertions, 36 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 3b7899f07..ff6b87808 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -1,12 +1,17 @@ package worker import ( + "crypto/rand" "fmt" + "net" "os" + "path/filepath" + "strings" "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -31,6 +36,7 @@ type Worker struct { tasksFailed int heartbeatTicker *time.Ticker requestTicker *time.Ticker + taskLogHandler *tasks.TaskLogHandler } // AdminClient defines the interface for communicating with the admin server @@ -41,30 +47,113 @@ type AdminClient interface { SendHeartbeat(workerID string, status *types.WorkerStatus) error RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) CompleteTask(taskID string, success bool, errorMsg string) error + CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error UpdateTaskProgress(taskID string, progress float64) error IsConnected() bool } +// GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory +func GenerateOrLoadWorkerID(workingDir string) (string, error) { + const workerIDFile = "worker.id" + + var idFilePath string + if workingDir != "" { + idFilePath = filepath.Join(workingDir, workerIDFile) + } else { + // Use current working directory if none specified + wd, err := os.Getwd() + if err != nil { + return "", fmt.Errorf("failed to get working directory: %w", err) + } + idFilePath = filepath.Join(wd, workerIDFile) + } + + // Try to read existing worker ID + if data, err := os.ReadFile(idFilePath); err == nil { + workerID := strings.TrimSpace(string(data)) + if workerID != "" { + glog.Infof("Loaded existing worker ID from %s: %s", idFilePath, workerID) + return workerID, nil + } + } + + // Generate new unique worker ID with host information + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "unknown" + } + + // Get local IP address for better host identification + var hostIP string + if addrs, err := net.InterfaceAddrs(); err == nil { + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + hostIP = ipnet.IP.String() + break + } + } + } + } + if hostIP == "" { + hostIP = "noip" + } + + // Create host identifier combining hostname and IP + hostID := fmt.Sprintf("%s@%s", hostname, hostIP) + + // Generate random component for uniqueness + randomBytes := make([]byte, 4) + var workerID string + if _, err := rand.Read(randomBytes); err != nil { + // Fallback to timestamp if crypto/rand fails + workerID = fmt.Sprintf("worker-%s-%d", hostID, time.Now().Unix()) + glog.Infof("Generated fallback worker ID: %s", workerID) + } else { + // Use random bytes + timestamp for uniqueness + randomHex := fmt.Sprintf("%x", randomBytes) + timestamp := time.Now().Unix() + workerID = fmt.Sprintf("worker-%s-%s-%d", hostID, randomHex, timestamp) + glog.Infof("Generated new worker ID: %s", workerID) + } + + // Save worker ID to file + if err := os.WriteFile(idFilePath, []byte(workerID), 0644); err != nil { + glog.Warningf("Failed to save worker ID to %s: %v", idFilePath, err) + } else { + glog.Infof("Saved worker ID to %s", idFilePath) + } + + return workerID, nil +} + // NewWorker creates a new worker instance func NewWorker(config *types.WorkerConfig) (*Worker, error) { if config == nil { config = types.DefaultWorkerConfig() } - // Always auto-generate worker ID - hostname, _ := os.Hostname() - workerID := fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix()) + // Generate or load persistent worker ID + workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir) + if err != nil { + return nil, fmt.Errorf("failed to generate or load worker ID: %w", err) + } // Use the global registry that already has all tasks registered registry := tasks.GetGlobalRegistry() + // Initialize task log handler + logDir := filepath.Join(config.BaseWorkingDir, "task_logs") + taskLogHandler := tasks.NewTaskLogHandler(logDir) + worker := &Worker{ - id: workerID, - config: config, - registry: registry, - currentTasks: make(map[string]*types.Task), - stopChan: make(chan struct{}), - startTime: time.Now(), + id: workerID, + config: config, + registry: registry, + currentTasks: make(map[string]*types.Task), + stopChan: make(chan struct{}), + startTime: time.Now(), + taskLogHandler: taskLogHandler, } glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetSupportedTypes())) @@ -72,6 +161,17 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { return worker, nil } +// getTaskLoggerConfig returns the task logger configuration with worker's log directory +func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig { + config := tasks.DefaultTaskLoggerConfig() + + // Use worker's configured log directory (BaseWorkingDir is guaranteed to be non-empty) + logDir := filepath.Join(w.config.BaseWorkingDir, "task_logs") + config.BaseLogDir = logDir + + return config +} + // ID returns the worker ID func (w *Worker) ID() string { return w.id @@ -90,15 +190,10 @@ func (w *Worker) Start() error { return fmt.Errorf("admin client is not set") } - // Connect to admin server - if err := w.adminClient.Connect(); err != nil { - return fmt.Errorf("failed to connect to admin server: %w", err) - } - w.running = true w.startTime = time.Now() - // Register with admin server + // Prepare worker info for registration workerInfo := &types.Worker{ ID: w.id, Capabilities: w.config.Capabilities, @@ -108,17 +203,33 @@ func (w *Worker) Start() error { LastHeartbeat: time.Now(), } + // Register worker info with client first (this stores it for use during connection) if err := w.adminClient.RegisterWorker(workerInfo); err != nil { - w.running = false - w.adminClient.Disconnect() - return fmt.Errorf("failed to register worker: %w", err) + glog.V(1).Infof("Worker info stored for registration: %v", err) + // This is expected if not connected yet } - // Start worker loops + // Start connection attempt (will register immediately if successful) + glog.Infof("🚀 WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d", + w.id, w.config.Capabilities, w.config.MaxConcurrent) + + // Try initial connection, but don't fail if it doesn't work immediately + if err := w.adminClient.Connect(); err != nil { + glog.Warningf("⚠️ INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err) + // Don't return error - let the reconnection loop handle it + } else { + glog.Infof("✅ INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id) + } + + // Start worker loops regardless of initial connection status + // They will handle connection failures gracefully + glog.V(1).Infof("🔄 STARTING LOOPS: Worker %s starting background loops", w.id) go w.heartbeatLoop() go w.taskRequestLoop() + go w.connectionMonitorLoop() + go w.messageProcessingLoop() - glog.Infof("Worker %s started", w.id) + glog.Infof("✅ WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id) return nil } @@ -208,14 +319,25 @@ func (w *Worker) GetStatus() types.WorkerStatus { // HandleTask handles a task execution func (w *Worker) HandleTask(task *types.Task) error { + glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)", + w.id, task.ID, task.Type, task.VolumeID) + w.mutex.Lock() - if len(w.currentTasks) >= w.config.MaxConcurrent { + currentLoad := len(w.currentTasks) + if currentLoad >= w.config.MaxConcurrent { w.mutex.Unlock() + glog.Errorf("❌ TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", + w.id, currentLoad, w.config.MaxConcurrent, task.ID) return fmt.Errorf("worker is at capacity") } + w.currentTasks[task.ID] = task + newLoad := len(w.currentTasks) w.mutex.Unlock() + glog.Infof("✅ TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", + w.id, task.ID, newLoad, w.config.MaxConcurrent) + // Execute task in goroutine go w.executeTask(task) @@ -249,40 +371,95 @@ func (w *Worker) SetAdminClient(client AdminClient) { // executeTask executes a task func (w *Worker) executeTask(task *types.Task) { + startTime := time.Now() + defer func() { w.mutex.Lock() delete(w.currentTasks, task.ID) + currentLoad := len(w.currentTasks) w.mutex.Unlock() + + duration := time.Since(startTime) + glog.Infof("🏁 TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d", + w.id, task.ID, duration, currentLoad, w.config.MaxConcurrent) }() - glog.Infof("Worker %s executing task %s: %s", w.id, task.ID, task.Type) + glog.Infof("🚀 TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v", + w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339)) - // Create task instance - taskParams := types.TaskParams{ - VolumeID: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - Parameters: task.Parameters, + // Report task start to admin server + if err := w.adminClient.UpdateTaskProgress(task.ID, 0.0); err != nil { + glog.V(1).Infof("Failed to report task start to admin: %v", err) } - taskInstance, err := w.registry.CreateTask(task.Type, taskParams) + // Determine task-specific working directory (BaseWorkingDir is guaranteed to be non-empty) + taskWorkingDir := filepath.Join(w.config.BaseWorkingDir, string(task.Type)) + glog.V(2).Infof("📁 WORKING DIRECTORY: Task %s using working directory: %s", task.ID, taskWorkingDir) + + // Check if we have typed protobuf parameters + if task.TypedParams == nil { + w.completeTask(task.ID, false, "task has no typed parameters - task was not properly planned") + glog.Errorf("Worker %s rejecting task %s: no typed parameters", w.id, task.ID) + return + } + + // Use typed task execution (all tasks should be typed) + glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID) + + typedRegistry := types.GetGlobalTypedTaskRegistry() + typedTaskInstance, err := typedRegistry.CreateTypedTask(task.Type) if err != nil { - w.completeTask(task.ID, false, fmt.Sprintf("failed to create task: %v", err)) + w.completeTask(task.ID, false, fmt.Sprintf("typed task not available for %s: %v", task.Type, err)) + glog.Errorf("Worker %s failed to create typed task %s: %v", w.id, task.ID, err) return } - // Execute task - err = taskInstance.Execute(taskParams) + // Configure task logger directory (all typed tasks support this) + tasksLoggerConfig := w.getTaskLoggerConfig() + typedLoggerConfig := types.TaskLoggerConfig{ + BaseLogDir: tasksLoggerConfig.BaseLogDir, + MaxTasks: tasksLoggerConfig.MaxTasks, + MaxLogSizeMB: tasksLoggerConfig.MaxLogSizeMB, + EnableConsole: tasksLoggerConfig.EnableConsole, + } + typedTaskInstance.SetLoggerConfig(typedLoggerConfig) + glog.V(2).Infof("Set typed task logger config for %s: %s", task.ID, typedLoggerConfig.BaseLogDir) + + // Initialize logging (all typed tasks support this) + taskParams := types.TaskParams{ + VolumeID: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + WorkingDir: taskWorkingDir, + TypedParams: task.TypedParams, + GrpcDialOption: w.config.GrpcDialOption, + } + + if err := typedTaskInstance.InitializeTaskLogger(task.ID, w.id, taskParams); err != nil { + glog.Warningf("Failed to initialize task logger for %s: %v", task.ID, err) + } + + // Set progress callback that reports to admin server + typedTaskInstance.SetProgressCallback(func(progress float64) { + // Report progress updates to admin server + glog.V(2).Infof("Task %s progress: %.1f%%", task.ID, progress) + if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil { + glog.V(1).Infof("Failed to report task progress to admin: %v", err) + } + }) + + // Execute typed task + err = typedTaskInstance.ExecuteTyped(task.TypedParams) // Report completion if err != nil { w.completeTask(task.ID, false, err.Error()) w.tasksFailed++ - glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) + glog.Errorf("Worker %s failed to execute typed task %s: %v", w.id, task.ID, err) } else { w.completeTask(task.ID, true, "") w.tasksCompleted++ - glog.Infof("Worker %s completed task %s successfully", w.id, task.ID) + glog.Infof("Worker %s completed typed task %s successfully", w.id, task.ID) } } @@ -348,20 +525,29 @@ func (w *Worker) requestTasks() { w.mutex.RUnlock() if currentLoad >= w.config.MaxConcurrent { + glog.V(3).Infof("🚫 TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)", + w.id, currentLoad, w.config.MaxConcurrent) return // Already at capacity } if w.adminClient != nil { + glog.V(3).Infof("📞 REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)", + w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities) + task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities) if err != nil { - glog.V(2).Infof("Failed to request task: %v", err) + glog.V(2).Infof("❌ TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err) return } if task != nil { + glog.Infof("📨 TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s", + w.id, task.ID, task.Type) if err := w.HandleTask(task); err != nil { - glog.Errorf("Failed to handle task: %v", err) + glog.Errorf("❌ TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err) } + } else { + glog.V(3).Infof("📭 NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id) } } } @@ -383,6 +569,59 @@ func (w *Worker) GetCurrentTasks() map[string]*types.Task { return tasks } +// registerWorker registers the worker with the admin server +func (w *Worker) registerWorker() { + workerInfo := &types.Worker{ + ID: w.id, + Capabilities: w.config.Capabilities, + MaxConcurrent: w.config.MaxConcurrent, + Status: "active", + CurrentLoad: 0, + LastHeartbeat: time.Now(), + } + + if err := w.adminClient.RegisterWorker(workerInfo); err != nil { + glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err) + } else { + glog.Infof("Worker %s registered successfully with admin server", w.id) + } +} + +// connectionMonitorLoop monitors connection status +func (w *Worker) connectionMonitorLoop() { + glog.V(1).Infof("🔍 CONNECTION MONITOR STARTED: Worker %s connection monitor loop started", w.id) + ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds + defer ticker.Stop() + + lastConnectionStatus := false + + for { + select { + case <-w.stopChan: + glog.V(1).Infof("🛑 CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id) + return + case <-ticker.C: + // Monitor connection status and log changes + currentConnectionStatus := w.adminClient != nil && w.adminClient.IsConnected() + + if currentConnectionStatus != lastConnectionStatus { + if currentConnectionStatus { + glog.Infof("🔗 CONNECTION RESTORED: Worker %s connection status changed: connected", w.id) + } else { + glog.Warningf("⚠️ CONNECTION LOST: Worker %s connection status changed: disconnected", w.id) + } + lastConnectionStatus = currentConnectionStatus + } else { + if currentConnectionStatus { + glog.V(3).Infof("✅ CONNECTION OK: Worker %s connection status: connected", w.id) + } else { + glog.V(1).Infof("🔌 CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id) + } + } + } + } +} + // GetConfig returns the worker configuration func (w *Worker) GetConfig() *types.WorkerConfig { return w.config @@ -408,3 +647,158 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { SuccessRate: successRate, } } + +// messageProcessingLoop processes incoming admin messages +func (w *Worker) messageProcessingLoop() { + glog.Infof("🔄 MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id) + + // Get access to the incoming message channel from gRPC client + grpcClient, ok := w.adminClient.(*GrpcAdminClient) + if !ok { + glog.Warningf("⚠️ MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id) + return + } + + incomingChan := grpcClient.GetIncomingChannel() + glog.V(1).Infof("📡 MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id) + + for { + select { + case <-w.stopChan: + glog.Infof("🛑 MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id) + return + case message := <-incomingChan: + if message != nil { + glog.V(3).Infof("📥 MESSAGE PROCESSING: Worker %s processing incoming message", w.id) + w.processAdminMessage(message) + } else { + glog.V(3).Infof("📭 NULL MESSAGE: Worker %s received nil message", w.id) + } + } + } +} + +// processAdminMessage processes different types of admin messages +func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) { + glog.V(4).Infof("📫 ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message) + + switch msg := message.Message.(type) { + case *worker_pb.AdminMessage_RegistrationResponse: + glog.V(2).Infof("✅ REGISTRATION RESPONSE: Worker %s received registration response", w.id) + w.handleRegistrationResponse(msg.RegistrationResponse) + case *worker_pb.AdminMessage_HeartbeatResponse: + glog.V(3).Infof("💓 HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id) + w.handleHeartbeatResponse(msg.HeartbeatResponse) + case *worker_pb.AdminMessage_TaskLogRequest: + glog.V(1).Infof("📋 TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId) + w.handleTaskLogRequest(msg.TaskLogRequest) + case *worker_pb.AdminMessage_TaskAssignment: + taskAssign := msg.TaskAssignment + glog.V(1).Infof("Worker %s received direct task assignment %s (type: %s, volume: %d)", + w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId) + + // Convert to task and handle it + task := &types.Task{ + ID: taskAssign.TaskId, + Type: types.TaskType(taskAssign.TaskType), + Status: types.TaskStatusAssigned, + VolumeID: taskAssign.Params.VolumeId, + Server: taskAssign.Params.Server, + Collection: taskAssign.Params.Collection, + Priority: types.TaskPriority(taskAssign.Priority), + CreatedAt: time.Unix(taskAssign.CreatedTime, 0), + TypedParams: taskAssign.Params, + } + + if err := w.HandleTask(task); err != nil { + glog.Errorf("❌ DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err) + } + case *worker_pb.AdminMessage_TaskCancellation: + glog.Infof("🛑 TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId) + w.handleTaskCancellation(msg.TaskCancellation) + case *worker_pb.AdminMessage_AdminShutdown: + glog.Infof("🔄 ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id) + w.handleAdminShutdown(msg.AdminShutdown) + default: + glog.V(1).Infof("❓ UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message) + } +} + +// handleTaskLogRequest processes task log requests from admin server +func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) { + glog.V(1).Infof("Worker %s handling task log request for task %s", w.id, request.TaskId) + + // Use the task log handler to process the request + response := w.taskLogHandler.HandleLogRequest(request) + + // Send response back to admin server + responseMsg := &worker_pb.WorkerMessage{ + WorkerId: w.id, + Timestamp: time.Now().Unix(), + Message: &worker_pb.WorkerMessage_TaskLogResponse{ + TaskLogResponse: response, + }, + } + + grpcClient, ok := w.adminClient.(*GrpcAdminClient) + if !ok { + glog.Errorf("Cannot send task log response: admin client is not gRPC client") + return + } + + select { + case grpcClient.outgoing <- responseMsg: + glog.V(1).Infof("Task log response sent for task %s", request.TaskId) + case <-time.After(5 * time.Second): + glog.Errorf("Failed to send task log response for task %s: timeout", request.TaskId) + } +} + +// handleTaskCancellation processes task cancellation requests +func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) { + glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId) + + w.mutex.Lock() + defer w.mutex.Unlock() + + if task, exists := w.currentTasks[cancellation.TaskId]; exists { + // TODO: Implement task cancellation logic + glog.Infof("Cancelling task %s", task.ID) + } else { + glog.Warningf("Cannot cancel task %s: task not found", cancellation.TaskId) + } +} + +// handleAdminShutdown processes admin shutdown notifications +func (w *Worker) handleAdminShutdown(shutdown *worker_pb.AdminShutdown) { + glog.Infof("Worker %s received admin shutdown notification: %s", w.id, shutdown.Reason) + + gracefulSeconds := shutdown.GracefulShutdownSeconds + if gracefulSeconds > 0 { + glog.Infof("Graceful shutdown in %d seconds", gracefulSeconds) + time.AfterFunc(time.Duration(gracefulSeconds)*time.Second, func() { + w.Stop() + }) + } else { + // Immediate shutdown + go w.Stop() + } +} + +// handleRegistrationResponse processes registration response from admin server +func (w *Worker) handleRegistrationResponse(response *worker_pb.RegistrationResponse) { + glog.V(2).Infof("Worker %s processed registration response: success=%v", w.id, response.Success) + if !response.Success { + glog.Warningf("Worker %s registration failed: %s", w.id, response.Message) + } + // Registration responses are typically handled by the gRPC client during connection setup + // No additional action needed here +} + +// handleHeartbeatResponse processes heartbeat response from admin server +func (w *Worker) handleHeartbeatResponse(response *worker_pb.HeartbeatResponse) { + glog.V(4).Infof("Worker %s processed heartbeat response", w.id) + // Heartbeat responses are mainly for keeping the connection alive + // The admin may include configuration updates or status information in the future + // For now, just acknowledge receipt +} |
