diff options
Diffstat (limited to 'weed/worker/client.go')
| -rw-r--r-- | weed/worker/client.go | 307 |
1 files changed, 241 insertions, 66 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go index 60b33fb31..53854c6e3 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -80,6 +80,21 @@ func (c *GrpcAdminClient) Connect() error { return fmt.Errorf("already connected") } + // Always start the reconnection loop, even if initial connection fails + go c.reconnectionLoop() + + // Attempt initial connection + err := c.attemptConnection() + if err != nil { + glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err) + return err + } + + return nil +} + +// attemptConnection tries to establish the connection without managing the reconnection loop +func (c *GrpcAdminClient) attemptConnection() error { // Detect TLS support and create appropriate connection conn, err := c.createConnection() if err != nil { @@ -100,10 +115,34 @@ func (c *GrpcAdminClient) Connect() error { c.stream = stream c.connected = true - // Start stream handlers and reconnection loop - go c.handleOutgoing() - go c.handleIncoming() - go c.reconnectionLoop() + // Always check for worker info and send registration immediately as the very first message + c.mutex.RLock() + workerInfo := c.lastWorkerInfo + c.mutex.RUnlock() + + if workerInfo != nil { + // Send registration synchronously as the very first message + if err := c.sendRegistrationSync(workerInfo); err != nil { + c.conn.Close() + c.connected = false + return fmt.Errorf("failed to register worker: %w", err) + } + glog.Infof("Worker registered successfully with admin server") + } else { + // No worker info yet - stream will wait for registration + glog.V(1).Infof("Connected to admin server, waiting for worker registration info") + } + + // Start stream handlers with synchronization + outgoingReady := make(chan struct{}) + incomingReady := make(chan struct{}) + + go c.handleOutgoingWithReady(outgoingReady) + go c.handleIncomingWithReady(incomingReady) + + // Wait for both handlers to be ready + <-outgoingReady + <-incomingReady glog.Infof("Connected to admin server at %s", c.adminAddress) return nil @@ -268,53 +307,16 @@ func (c *GrpcAdminClient) reconnect() error { if c.conn != nil { c.conn.Close() } + c.connected = false c.mutex.Unlock() - // Create new connection - conn, err := c.createConnection() + // Attempt to re-establish connection using the same logic as initial connection + err := c.attemptConnection() if err != nil { - return fmt.Errorf("failed to create connection: %w", err) - } - - client := worker_pb.NewWorkerServiceClient(conn) - - // Create new stream - streamCtx, streamCancel := context.WithCancel(context.Background()) - stream, err := client.WorkerStream(streamCtx) - if err != nil { - conn.Close() - streamCancel() - return fmt.Errorf("failed to create stream: %w", err) - } - - // Update client state - c.mutex.Lock() - c.conn = conn - c.client = client - c.stream = stream - c.streamCtx = streamCtx - c.streamCancel = streamCancel - c.connected = true - c.mutex.Unlock() - - // Restart stream handlers - go c.handleOutgoing() - go c.handleIncoming() - - // Re-register worker if we have previous registration info - c.mutex.RLock() - workerInfo := c.lastWorkerInfo - c.mutex.RUnlock() - - if workerInfo != nil { - glog.Infof("Re-registering worker after reconnection...") - if err := c.sendRegistration(workerInfo); err != nil { - glog.Errorf("Failed to re-register worker: %v", err) - // Don't fail the reconnection because of registration failure - // The registration will be retried on next heartbeat or operation - } + return fmt.Errorf("failed to reconnect: %w", err) } + // Registration is now handled in attemptConnection if worker info is available return nil } @@ -340,8 +342,19 @@ func (c *GrpcAdminClient) handleOutgoing() { } } +// handleOutgoingWithReady processes outgoing messages and signals when ready +func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) { + // Signal that this handler is ready to process messages + close(ready) + + // Now process messages normally + c.handleOutgoing() +} + // handleIncoming processes incoming messages from admin func (c *GrpcAdminClient) handleIncoming() { + glog.V(1).Infof("📡 INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID) + for { c.mutex.RLock() connected := c.connected @@ -349,15 +362,17 @@ func (c *GrpcAdminClient) handleIncoming() { c.mutex.RUnlock() if !connected { + glog.V(1).Infof("🔌 INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID) break } + glog.V(4).Infof("👂 LISTENING: Worker %s waiting for message from admin server", c.workerID) msg, err := stream.Recv() if err != nil { if err == io.EOF { - glog.Infof("Admin server closed the stream") + glog.Infof("🔚 STREAM CLOSED: Worker %s admin server closed the stream", c.workerID) } else { - glog.Errorf("Failed to receive message from admin: %v", err) + glog.Errorf("❌ RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err) } c.mutex.Lock() c.connected = false @@ -365,26 +380,42 @@ func (c *GrpcAdminClient) handleIncoming() { break } + glog.V(4).Infof("📨 MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message) + // Route message to waiting goroutines or general handler select { case c.incoming <- msg: + glog.V(3).Infof("✅ MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID) case <-time.After(time.Second): - glog.Warningf("Incoming message buffer full, dropping message") + glog.Warningf("🚫 MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message) } } + + glog.V(1).Infof("🏁 INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID) +} + +// handleIncomingWithReady processes incoming messages and signals when ready +func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) { + // Signal that this handler is ready to process messages + close(ready) + + // Now process messages normally + c.handleIncoming() } // RegisterWorker registers the worker with the admin server func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error { - if !c.connected { - return fmt.Errorf("not connected to admin server") - } - // Store worker info for re-registration after reconnection c.mutex.Lock() c.lastWorkerInfo = worker c.mutex.Unlock() + // If not connected, registration will happen when connection is established + if !c.connected { + glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") + return nil + } + return c.sendRegistration(worker) } @@ -435,9 +466,88 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error { } } +// sendRegistrationSync sends the registration message synchronously +func (c *GrpcAdminClient) sendRegistrationSync(worker *types.Worker) error { + capabilities := make([]string, len(worker.Capabilities)) + for i, cap := range worker.Capabilities { + capabilities[i] = string(cap) + } + + msg := &worker_pb.WorkerMessage{ + WorkerId: c.workerID, + Timestamp: time.Now().Unix(), + Message: &worker_pb.WorkerMessage_Registration{ + Registration: &worker_pb.WorkerRegistration{ + WorkerId: c.workerID, + Address: worker.Address, + Capabilities: capabilities, + MaxConcurrent: int32(worker.MaxConcurrent), + Metadata: make(map[string]string), + }, + }, + } + + // Send directly to stream to ensure it's the first message + if err := c.stream.Send(msg); err != nil { + return fmt.Errorf("failed to send registration message: %w", err) + } + + // Create a channel to receive the response + responseChan := make(chan *worker_pb.AdminMessage, 1) + errChan := make(chan error, 1) + + // Start a goroutine to listen for the response + go func() { + for { + response, err := c.stream.Recv() + if err != nil { + errChan <- fmt.Errorf("failed to receive registration response: %w", err) + return + } + + if regResp := response.GetRegistrationResponse(); regResp != nil { + responseChan <- response + return + } + // Continue waiting if it's not a registration response + } + }() + + // Wait for registration response with timeout + timeout := time.NewTimer(10 * time.Second) + defer timeout.Stop() + + select { + case response := <-responseChan: + if regResp := response.GetRegistrationResponse(); regResp != nil { + if regResp.Success { + glog.V(1).Infof("Worker registered successfully: %s", regResp.Message) + return nil + } + return fmt.Errorf("registration failed: %s", regResp.Message) + } + return fmt.Errorf("unexpected response type") + case err := <-errChan: + return err + case <-timeout.C: + return fmt.Errorf("registration timeout") + } +} + // SendHeartbeat sends heartbeat to admin server func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { if !c.connected { + // If we're currently reconnecting, don't wait - just skip the heartbeat + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("Skipping heartbeat during reconnection") + return nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(10 * time.Second); err != nil { return fmt.Errorf("not connected to admin server: %w", err) @@ -477,6 +587,17 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta // RequestTask requests a new task from admin server func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) { if !c.connected { + // If we're currently reconnecting, don't wait - just return no task + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("🔄 RECONNECTING: Worker %s skipping task request during reconnection", workerID) + return nil, nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(5 * time.Second); err != nil { return nil, fmt.Errorf("not connected to admin server: %w", err) @@ -488,6 +609,9 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task caps[i] = string(cap) } + glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v", + workerID, capabilities) + msg := &worker_pb.WorkerMessage{ WorkerId: c.workerID, Timestamp: time.Now().Unix(), @@ -502,23 +626,24 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task select { case c.outgoing <- msg: + glog.V(3).Infof("✅ TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID) case <-time.After(time.Second): + glog.Errorf("❌ TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID) return nil, fmt.Errorf("failed to send task request: timeout") } // Wait for task assignment + glog.V(3).Infof("⏳ WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID) timeout := time.NewTimer(5 * time.Second) defer timeout.Stop() for { select { case response := <-c.incoming: + glog.V(3).Infof("📨 RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message) if taskAssign := response.GetTaskAssignment(); taskAssign != nil { - // Convert parameters map[string]string to map[string]interface{} - parameters := make(map[string]interface{}) - for k, v := range taskAssign.Params.Parameters { - parameters[k] = v - } + glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)", + workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId) // Convert to our task type task := &types.Task{ @@ -530,11 +655,15 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task Collection: taskAssign.Params.Collection, Priority: types.TaskPriority(taskAssign.Priority), CreatedAt: time.Unix(taskAssign.CreatedTime, 0), - Parameters: parameters, + // Use typed protobuf parameters directly + TypedParams: taskAssign.Params, } return task, nil + } else { + glog.V(3).Infof("📭 NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message) } case <-timeout.C: + glog.V(3).Infof("⏰ TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID) return nil, nil // No task available } } @@ -542,24 +671,47 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task // CompleteTask reports task completion to admin server func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error { + return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil) +} + +// CompleteTaskWithMetadata reports task completion with additional metadata +func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error { if !c.connected { + // If we're currently reconnecting, don't wait - just skip the completion report + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID) + return nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(5 * time.Second); err != nil { return fmt.Errorf("not connected to admin server: %w", err) } } + taskComplete := &worker_pb.TaskComplete{ + TaskId: taskID, + WorkerId: c.workerID, + Success: success, + ErrorMessage: errorMsg, + CompletionTime: time.Now().Unix(), + } + + // Add metadata if provided + if metadata != nil { + taskComplete.ResultMetadata = metadata + } + msg := &worker_pb.WorkerMessage{ WorkerId: c.workerID, Timestamp: time.Now().Unix(), Message: &worker_pb.WorkerMessage_TaskComplete{ - TaskComplete: &worker_pb.TaskComplete{ - TaskId: taskID, - WorkerId: c.workerID, - Success: success, - ErrorMessage: errorMsg, - CompletionTime: time.Now().Unix(), - }, + TaskComplete: taskComplete, }, } @@ -574,6 +726,17 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str // UpdateTaskProgress updates task progress to admin server func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error { if !c.connected { + // If we're currently reconnecting, don't wait - just skip the progress update + c.mutex.RLock() + reconnecting := c.reconnecting + c.mutex.RUnlock() + + if reconnecting { + // Don't treat as an error - reconnection is in progress + glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID) + return nil + } + // Wait for reconnection for a short time if err := c.waitForConnection(5 * time.Second); err != nil { return fmt.Errorf("not connected to admin server: %w", err) @@ -663,6 +826,12 @@ func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error { return fmt.Errorf("timeout waiting for connection") } +// GetIncomingChannel returns the incoming message channel for message processing +// This allows the worker to process admin messages directly +func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage { + return c.incoming +} + // MockAdminClient provides a mock implementation for testing type MockAdminClient struct { workerID string @@ -741,6 +910,12 @@ func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) er return nil } +// CompleteTaskWithMetadata mock implementation +func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error { + glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata) + return nil +} + // IsConnected mock implementation func (m *MockAdminClient) IsConnected() bool { m.mutex.RLock() |
