aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/client.go')
-rw-r--r--weed/worker/client.go307
1 files changed, 241 insertions, 66 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
index 60b33fb31..53854c6e3 100644
--- a/weed/worker/client.go
+++ b/weed/worker/client.go
@@ -80,6 +80,21 @@ func (c *GrpcAdminClient) Connect() error {
return fmt.Errorf("already connected")
}
+ // Always start the reconnection loop, even if initial connection fails
+ go c.reconnectionLoop()
+
+ // Attempt initial connection
+ err := c.attemptConnection()
+ if err != nil {
+ glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
+ return err
+ }
+
+ return nil
+}
+
+// attemptConnection tries to establish the connection without managing the reconnection loop
+func (c *GrpcAdminClient) attemptConnection() error {
// Detect TLS support and create appropriate connection
conn, err := c.createConnection()
if err != nil {
@@ -100,10 +115,34 @@ func (c *GrpcAdminClient) Connect() error {
c.stream = stream
c.connected = true
- // Start stream handlers and reconnection loop
- go c.handleOutgoing()
- go c.handleIncoming()
- go c.reconnectionLoop()
+ // Always check for worker info and send registration immediately as the very first message
+ c.mutex.RLock()
+ workerInfo := c.lastWorkerInfo
+ c.mutex.RUnlock()
+
+ if workerInfo != nil {
+ // Send registration synchronously as the very first message
+ if err := c.sendRegistrationSync(workerInfo); err != nil {
+ c.conn.Close()
+ c.connected = false
+ return fmt.Errorf("failed to register worker: %w", err)
+ }
+ glog.Infof("Worker registered successfully with admin server")
+ } else {
+ // No worker info yet - stream will wait for registration
+ glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
+ }
+
+ // Start stream handlers with synchronization
+ outgoingReady := make(chan struct{})
+ incomingReady := make(chan struct{})
+
+ go c.handleOutgoingWithReady(outgoingReady)
+ go c.handleIncomingWithReady(incomingReady)
+
+ // Wait for both handlers to be ready
+ <-outgoingReady
+ <-incomingReady
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
@@ -268,53 +307,16 @@ func (c *GrpcAdminClient) reconnect() error {
if c.conn != nil {
c.conn.Close()
}
+ c.connected = false
c.mutex.Unlock()
- // Create new connection
- conn, err := c.createConnection()
+ // Attempt to re-establish connection using the same logic as initial connection
+ err := c.attemptConnection()
if err != nil {
- return fmt.Errorf("failed to create connection: %w", err)
- }
-
- client := worker_pb.NewWorkerServiceClient(conn)
-
- // Create new stream
- streamCtx, streamCancel := context.WithCancel(context.Background())
- stream, err := client.WorkerStream(streamCtx)
- if err != nil {
- conn.Close()
- streamCancel()
- return fmt.Errorf("failed to create stream: %w", err)
- }
-
- // Update client state
- c.mutex.Lock()
- c.conn = conn
- c.client = client
- c.stream = stream
- c.streamCtx = streamCtx
- c.streamCancel = streamCancel
- c.connected = true
- c.mutex.Unlock()
-
- // Restart stream handlers
- go c.handleOutgoing()
- go c.handleIncoming()
-
- // Re-register worker if we have previous registration info
- c.mutex.RLock()
- workerInfo := c.lastWorkerInfo
- c.mutex.RUnlock()
-
- if workerInfo != nil {
- glog.Infof("Re-registering worker after reconnection...")
- if err := c.sendRegistration(workerInfo); err != nil {
- glog.Errorf("Failed to re-register worker: %v", err)
- // Don't fail the reconnection because of registration failure
- // The registration will be retried on next heartbeat or operation
- }
+ return fmt.Errorf("failed to reconnect: %w", err)
}
+ // Registration is now handled in attemptConnection if worker info is available
return nil
}
@@ -340,8 +342,19 @@ func (c *GrpcAdminClient) handleOutgoing() {
}
}
+// handleOutgoingWithReady processes outgoing messages and signals when ready
+func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) {
+ // Signal that this handler is ready to process messages
+ close(ready)
+
+ // Now process messages normally
+ c.handleOutgoing()
+}
+
// handleIncoming processes incoming messages from admin
func (c *GrpcAdminClient) handleIncoming() {
+ glog.V(1).Infof("📡 INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID)
+
for {
c.mutex.RLock()
connected := c.connected
@@ -349,15 +362,17 @@ func (c *GrpcAdminClient) handleIncoming() {
c.mutex.RUnlock()
if !connected {
+ glog.V(1).Infof("🔌 INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID)
break
}
+ glog.V(4).Infof("👂 LISTENING: Worker %s waiting for message from admin server", c.workerID)
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
- glog.Infof("Admin server closed the stream")
+ glog.Infof("🔚 STREAM CLOSED: Worker %s admin server closed the stream", c.workerID)
} else {
- glog.Errorf("Failed to receive message from admin: %v", err)
+ glog.Errorf("❌ RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err)
}
c.mutex.Lock()
c.connected = false
@@ -365,26 +380,42 @@ func (c *GrpcAdminClient) handleIncoming() {
break
}
+ glog.V(4).Infof("📨 MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message)
+
// Route message to waiting goroutines or general handler
select {
case c.incoming <- msg:
+ glog.V(3).Infof("✅ MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID)
case <-time.After(time.Second):
- glog.Warningf("Incoming message buffer full, dropping message")
+ glog.Warningf("🚫 MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message)
}
}
+
+ glog.V(1).Infof("🏁 INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID)
+}
+
+// handleIncomingWithReady processes incoming messages and signals when ready
+func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
+ // Signal that this handler is ready to process messages
+ close(ready)
+
+ // Now process messages normally
+ c.handleIncoming()
}
// RegisterWorker registers the worker with the admin server
func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
- if !c.connected {
- return fmt.Errorf("not connected to admin server")
- }
-
// Store worker info for re-registration after reconnection
c.mutex.Lock()
c.lastWorkerInfo = worker
c.mutex.Unlock()
+ // If not connected, registration will happen when connection is established
+ if !c.connected {
+ glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
+ return nil
+ }
+
return c.sendRegistration(worker)
}
@@ -435,9 +466,88 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error {
}
}
+// sendRegistrationSync sends the registration message synchronously
+func (c *GrpcAdminClient) sendRegistrationSync(worker *types.Worker) error {
+ capabilities := make([]string, len(worker.Capabilities))
+ for i, cap := range worker.Capabilities {
+ capabilities[i] = string(cap)
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_Registration{
+ Registration: &worker_pb.WorkerRegistration{
+ WorkerId: c.workerID,
+ Address: worker.Address,
+ Capabilities: capabilities,
+ MaxConcurrent: int32(worker.MaxConcurrent),
+ Metadata: make(map[string]string),
+ },
+ },
+ }
+
+ // Send directly to stream to ensure it's the first message
+ if err := c.stream.Send(msg); err != nil {
+ return fmt.Errorf("failed to send registration message: %w", err)
+ }
+
+ // Create a channel to receive the response
+ responseChan := make(chan *worker_pb.AdminMessage, 1)
+ errChan := make(chan error, 1)
+
+ // Start a goroutine to listen for the response
+ go func() {
+ for {
+ response, err := c.stream.Recv()
+ if err != nil {
+ errChan <- fmt.Errorf("failed to receive registration response: %w", err)
+ return
+ }
+
+ if regResp := response.GetRegistrationResponse(); regResp != nil {
+ responseChan <- response
+ return
+ }
+ // Continue waiting if it's not a registration response
+ }
+ }()
+
+ // Wait for registration response with timeout
+ timeout := time.NewTimer(10 * time.Second)
+ defer timeout.Stop()
+
+ select {
+ case response := <-responseChan:
+ if regResp := response.GetRegistrationResponse(); regResp != nil {
+ if regResp.Success {
+ glog.V(1).Infof("Worker registered successfully: %s", regResp.Message)
+ return nil
+ }
+ return fmt.Errorf("registration failed: %s", regResp.Message)
+ }
+ return fmt.Errorf("unexpected response type")
+ case err := <-errChan:
+ return err
+ case <-timeout.C:
+ return fmt.Errorf("registration timeout")
+ }
+}
+
// SendHeartbeat sends heartbeat to admin server
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the heartbeat
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping heartbeat during reconnection")
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(10 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
@@ -477,6 +587,17 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta
// RequestTask requests a new task from admin server
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just return no task
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("🔄 RECONNECTING: Worker %s skipping task request during reconnection", workerID)
+ return nil, nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return nil, fmt.Errorf("not connected to admin server: %w", err)
@@ -488,6 +609,9 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
caps[i] = string(cap)
}
+ glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v",
+ workerID, capabilities)
+
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
@@ -502,23 +626,24 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
select {
case c.outgoing <- msg:
+ glog.V(3).Infof("✅ TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID)
case <-time.After(time.Second):
+ glog.Errorf("❌ TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID)
return nil, fmt.Errorf("failed to send task request: timeout")
}
// Wait for task assignment
+ glog.V(3).Infof("⏳ WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID)
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for {
select {
case response := <-c.incoming:
+ glog.V(3).Infof("📨 RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message)
if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
- // Convert parameters map[string]string to map[string]interface{}
- parameters := make(map[string]interface{})
- for k, v := range taskAssign.Params.Parameters {
- parameters[k] = v
- }
+ glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)",
+ workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
// Convert to our task type
task := &types.Task{
@@ -530,11 +655,15 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
Collection: taskAssign.Params.Collection,
Priority: types.TaskPriority(taskAssign.Priority),
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
- Parameters: parameters,
+ // Use typed protobuf parameters directly
+ TypedParams: taskAssign.Params,
}
return task, nil
+ } else {
+ glog.V(3).Infof("📭 NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message)
}
case <-timeout.C:
+ glog.V(3).Infof("⏰ TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID)
return nil, nil // No task available
}
}
@@ -542,24 +671,47 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
// CompleteTask reports task completion to admin server
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
+ return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil)
+}
+
+// CompleteTaskWithMetadata reports task completion with additional metadata
+func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the completion report
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID)
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
}
}
+ taskComplete := &worker_pb.TaskComplete{
+ TaskId: taskID,
+ WorkerId: c.workerID,
+ Success: success,
+ ErrorMessage: errorMsg,
+ CompletionTime: time.Now().Unix(),
+ }
+
+ // Add metadata if provided
+ if metadata != nil {
+ taskComplete.ResultMetadata = metadata
+ }
+
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskComplete{
- TaskComplete: &worker_pb.TaskComplete{
- TaskId: taskID,
- WorkerId: c.workerID,
- Success: success,
- ErrorMessage: errorMsg,
- CompletionTime: time.Now().Unix(),
- },
+ TaskComplete: taskComplete,
},
}
@@ -574,6 +726,17 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str
// UpdateTaskProgress updates task progress to admin server
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the progress update
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID)
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
@@ -663,6 +826,12 @@ func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
return fmt.Errorf("timeout waiting for connection")
}
+// GetIncomingChannel returns the incoming message channel for message processing
+// This allows the worker to process admin messages directly
+func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
+ return c.incoming
+}
+
// MockAdminClient provides a mock implementation for testing
type MockAdminClient struct {
workerID string
@@ -741,6 +910,12 @@ func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) er
return nil
}
+// CompleteTaskWithMetadata mock implementation
+func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
+ glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata)
+ return nil
+}
+
// IsConnected mock implementation
func (m *MockAdminClient) IsConnected() bool {
m.mutex.RLock()