diff options
Diffstat (limited to 'weed/worker/client.go')
| -rw-r--r-- | weed/worker/client.go | 808 |
1 files changed, 383 insertions, 425 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go index 9066afdf3..0ec36e419 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -2,6 +2,7 @@ package worker import ( "context" + "errors" "fmt" "io" "sync" @@ -14,22 +15,18 @@ import ( "google.golang.org/grpc" ) +var ( + ErrAlreadyConnected = errors.New("already connected") +) + // GrpcAdminClient implements AdminClient using gRPC bidirectional streaming type GrpcAdminClient struct { adminAddress string workerID string dialOption grpc.DialOption - conn *grpc.ClientConn - client worker_pb.WorkerServiceClient - stream worker_pb.WorkerService_WorkerStreamClient - streamCtx context.Context - streamCancel context.CancelFunc - - connected bool - reconnecting bool - shouldReconnect bool - mutex sync.RWMutex + cmds chan grpcCommand + closeOnce sync.Once // Reconnection parameters maxReconnectAttempts int @@ -37,17 +34,48 @@ type GrpcAdminClient struct { maxReconnectBackoff time.Duration reconnectMultiplier float64 - // Worker registration info for re-registration after reconnection - lastWorkerInfo *types.WorkerData - // Channels for communication - outgoing chan *worker_pb.WorkerMessage - incoming chan *worker_pb.AdminMessage - responseChans map[string]chan *worker_pb.AdminMessage - responsesMutex sync.RWMutex + outgoing chan *worker_pb.WorkerMessage + incoming chan *worker_pb.AdminMessage + responseChans map[string]chan *worker_pb.AdminMessage +} + +type grpcAction string + +const ( + ActionConnect grpcAction = "connect" + ActionDisconnect grpcAction = "disconnect" + ActionReconnect grpcAction = "reconnect" + ActionStreamError grpcAction = "stream_error" + ActionRegisterWorker grpcAction = "register_worker" + ActionQueryReconnecting grpcAction = "query_reconnecting" + ActionQueryConnected grpcAction = "query_connected" + ActionQueryShouldReconnect grpcAction = "query_shouldreconnect" +) + +type registrationRequest struct { + Worker *types.WorkerData + Resp chan error // Used to send the registration result back +} - // Shutdown channel - shutdownChan chan struct{} +type grpcCommand struct { + action grpcAction + data any + resp chan error // for reporting success/failure +} + +type grpcState struct { + connected bool + reconnecting bool + shouldReconnect bool + conn *grpc.ClientConn + client worker_pb.WorkerServiceClient + stream worker_pb.WorkerService_WorkerStreamClient + streamCtx context.Context + streamCancel context.CancelFunc + lastWorkerInfo *types.WorkerData + reconnectStop chan struct{} + streamExit chan struct{} } // NewGrpcAdminClient creates a new gRPC admin client @@ -55,11 +83,10 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di // Admin uses HTTP port + 10000 as gRPC port grpcAddress := pb.ServerToGrpcAddress(adminAddress) - return &GrpcAdminClient{ + c := &GrpcAdminClient{ adminAddress: grpcAddress, workerID: workerID, dialOption: dialOption, - shouldReconnect: true, maxReconnectAttempts: 0, // 0 means infinite attempts reconnectBackoff: 1 * time.Second, maxReconnectBackoff: 30 * time.Second, @@ -67,65 +94,129 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di outgoing: make(chan *worker_pb.WorkerMessage, 100), incoming: make(chan *worker_pb.AdminMessage, 100), responseChans: make(map[string]chan *worker_pb.AdminMessage), - shutdownChan: make(chan struct{}), + cmds: make(chan grpcCommand), + } + go c.managerLoop() + return c +} + +func (c *GrpcAdminClient) managerLoop() { + state := &grpcState{shouldReconnect: true} + + for cmd := range c.cmds { + switch cmd.action { + case ActionConnect: + c.handleConnect(cmd, state) + case ActionDisconnect: + c.handleDisconnect(cmd, state) + case ActionReconnect: + if state.connected || state.reconnecting || !state.shouldReconnect { + cmd.resp <- ErrAlreadyConnected + continue + } + state.reconnecting = true // Manager acknowledges the attempt + err := c.reconnect(state) + state.reconnecting = false + cmd.resp <- err + case ActionStreamError: + state.connected = false + case ActionRegisterWorker: + req := cmd.data.(registrationRequest) + state.lastWorkerInfo = req.Worker + if !state.connected { + glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") + // Respond immediately with success (registration will happen later) + req.Resp <- nil + continue + } + err := c.sendRegistration(req.Worker) + req.Resp <- err + case ActionQueryConnected: + respCh := cmd.data.(chan bool) + respCh <- state.connected + case ActionQueryReconnecting: + respCh := cmd.data.(chan bool) + respCh <- state.reconnecting + case ActionQueryShouldReconnect: + respCh := cmd.data.(chan bool) + respCh <- state.shouldReconnect + } } } // Connect establishes gRPC connection to admin server with TLS detection func (c *GrpcAdminClient) Connect() error { - c.mutex.Lock() - if c.connected { - c.mutex.Unlock() - return fmt.Errorf("already connected") + resp := make(chan error) + c.cmds <- grpcCommand{ + action: ActionConnect, + resp: resp, } - // Release lock before calling attemptConnection which needs to acquire locks internally - c.mutex.Unlock() + return <-resp +} - // Always start the reconnection loop, even if initial connection fails - go c.reconnectionLoop() +func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) { + if s.connected { + cmd.resp <- fmt.Errorf("already connected") + return + } + + // Start reconnection loop immediately (async) + stop := make(chan struct{}) + s.reconnectStop = stop + go c.reconnectionLoop(stop) - // Attempt initial connection - err := c.attemptConnection() + // Attempt the initial connection + err := c.attemptConnection(s) if err != nil { glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err) - return err + cmd.resp <- err + return } + cmd.resp <- nil +} - return nil +// createConnection attempts to connect using the provided dial option +func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption) + if err != nil { + return nil, fmt.Errorf("failed to connect to admin server: %w", err) + } + + glog.Infof("Connected to admin server at %s", c.adminAddress) + return conn, nil } // attemptConnection tries to establish the connection without managing the reconnection loop -func (c *GrpcAdminClient) attemptConnection() error { +func (c *GrpcAdminClient) attemptConnection(s *grpcState) error { // Detect TLS support and create appropriate connection conn, err := c.createConnection() if err != nil { return fmt.Errorf("failed to connect to admin server: %w", err) } - c.conn = conn - c.client = worker_pb.NewWorkerServiceClient(conn) + s.conn = conn + s.client = worker_pb.NewWorkerServiceClient(conn) // Create bidirectional stream - c.streamCtx, c.streamCancel = context.WithCancel(context.Background()) - stream, err := c.client.WorkerStream(c.streamCtx) + s.streamCtx, s.streamCancel = context.WithCancel(context.Background()) + stream, err := s.client.WorkerStream(s.streamCtx) + glog.Infof("Worker stream created") if err != nil { - c.conn.Close() + s.conn.Close() return fmt.Errorf("failed to create worker stream: %w", err) } - - c.stream = stream - c.connected = true + s.connected = true + s.stream = stream // Always check for worker info and send registration immediately as the very first message - c.mutex.RLock() - workerInfo := c.lastWorkerInfo - c.mutex.RUnlock() - - if workerInfo != nil { + if s.lastWorkerInfo != nil { // Send registration synchronously as the very first message - if err := c.sendRegistrationSync(workerInfo); err != nil { - c.conn.Close() - c.connected = false + if err := c.sendRegistrationSync(s.lastWorkerInfo, s.stream); err != nil { + s.conn.Close() + s.connected = false return fmt.Errorf("failed to register worker: %w", err) } glog.Infof("Worker registered successfully with admin server") @@ -134,290 +225,268 @@ func (c *GrpcAdminClient) attemptConnection() error { glog.V(1).Infof("Connected to admin server, waiting for worker registration info") } - // Start stream handlers with synchronization - outgoingReady := make(chan struct{}) - incomingReady := make(chan struct{}) - - go c.handleOutgoingWithReady(outgoingReady) - go c.handleIncomingWithReady(incomingReady) - - // Wait for both handlers to be ready - <-outgoingReady - <-incomingReady + // Start stream handlers + s.streamExit = make(chan struct{}) + go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds) + go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds) glog.Infof("Connected to admin server at %s", c.adminAddress) return nil } -// createConnection attempts to connect using the provided dial option -func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption) - if err != nil { - return nil, fmt.Errorf("failed to connect to admin server: %w", err) - } - - glog.Infof("Connected to admin server at %s", c.adminAddress) - return conn, nil -} - -// Disconnect closes the gRPC connection -func (c *GrpcAdminClient) Disconnect() error { - c.mutex.Lock() - defer c.mutex.Unlock() - - if !c.connected { - return nil - } - - c.connected = false - c.shouldReconnect = false - - // Send shutdown signal to stop reconnection loop - select { - case c.shutdownChan <- struct{}{}: - default: - } - - // Send shutdown message - shutdownMsg := &worker_pb.WorkerMessage{ - WorkerId: c.workerID, - Timestamp: time.Now().Unix(), - Message: &worker_pb.WorkerMessage_Shutdown{ - Shutdown: &worker_pb.WorkerShutdown{ - WorkerId: c.workerID, - Reason: "normal shutdown", - }, - }, +// reconnect attempts to re-establish the connection +func (c *GrpcAdminClient) reconnect(s *grpcState) error { + // Clean up existing connection completely + if s.streamCancel != nil { + s.streamCancel() } - - select { - case c.outgoing <- shutdownMsg: - case <-time.After(time.Second): - glog.Warningf("Failed to send shutdown message") + if s.stream != nil { + s.stream.CloseSend() } - - // Cancel stream context - if c.streamCancel != nil { - c.streamCancel() + if s.conn != nil { + s.conn.Close() } + s.connected = false - // Close stream - if c.stream != nil { - c.stream.CloseSend() - } - - // Close connection - if c.conn != nil { - c.conn.Close() + // Attempt to re-establish connection using the same logic as initial connection + if err := c.attemptConnection(s); err != nil { + return fmt.Errorf("failed to reconnect: %w", err) } - // Close channels - close(c.outgoing) - close(c.incoming) - - glog.Infof("Disconnected from admin server") + // Registration is now handled in attemptConnection if worker info is available return nil } // reconnectionLoop handles automatic reconnection with exponential backoff -func (c *GrpcAdminClient) reconnectionLoop() { +func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) { backoff := c.reconnectBackoff attempts := 0 for { + waitDuration := backoff + if attempts == 0 { + waitDuration = time.Second + } select { - case <-c.shutdownChan: + case <-reconnectStop: return - default: + case <-time.After(waitDuration): } - - c.mutex.RLock() - shouldReconnect := c.shouldReconnect && !c.connected && !c.reconnecting - c.mutex.RUnlock() - - if !shouldReconnect { - time.Sleep(time.Second) - continue + resp := make(chan error, 1) + c.cmds <- grpcCommand{ + action: ActionReconnect, + resp: resp, } - - c.mutex.Lock() - c.reconnecting = true - c.mutex.Unlock() - - glog.Infof("Attempting to reconnect to admin server (attempt %d)", attempts+1) - - // Attempt to reconnect - if err := c.reconnect(); err != nil { + err := <-resp + if err == nil { + // Successful reconnection + attempts = 0 + backoff = c.reconnectBackoff + glog.Infof("Successfully reconnected to admin server") + } else if errors.Is(err, ErrAlreadyConnected) { + attempts = 0 + backoff = c.reconnectBackoff + } else { attempts++ glog.Errorf("Reconnection attempt %d failed: %v", attempts, err) - // Reset reconnecting flag - c.mutex.Lock() - c.reconnecting = false - c.mutex.Unlock() - // Check if we should give up if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts { glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts) - c.mutex.Lock() - c.shouldReconnect = false - c.mutex.Unlock() return } - // Wait with exponential backoff - glog.Infof("Waiting %v before next reconnection attempt", backoff) - - select { - case <-c.shutdownChan: - return - case <-time.After(backoff): - } - // Increase backoff backoff = time.Duration(float64(backoff) * c.reconnectMultiplier) if backoff > c.maxReconnectBackoff { backoff = c.maxReconnectBackoff } - } else { - // Successful reconnection - attempts = 0 - backoff = c.reconnectBackoff - glog.Infof("Successfully reconnected to admin server") - - c.mutex.Lock() - c.reconnecting = false - c.mutex.Unlock() + glog.Infof("Waiting %v before next reconnection attempt", backoff) } } } -// reconnect attempts to re-establish the connection -func (c *GrpcAdminClient) reconnect() error { - // Clean up existing connection completely - c.mutex.Lock() - if c.streamCancel != nil { - c.streamCancel() - } - if c.stream != nil { - c.stream.CloseSend() - } - if c.conn != nil { - c.conn.Close() - } - c.connected = false - c.mutex.Unlock() - - // Attempt to re-establish connection using the same logic as initial connection - err := c.attemptConnection() - if err != nil { - return fmt.Errorf("failed to reconnect: %w", err) - } - - // Registration is now handled in attemptConnection if worker info is available - return nil -} - // handleOutgoing processes outgoing messages to admin -func (c *GrpcAdminClient) handleOutgoing() { - for msg := range c.outgoing { - c.mutex.RLock() - connected := c.connected - stream := c.stream - c.mutex.RUnlock() - - if !connected { - break +func handleOutgoing( + stream worker_pb.WorkerService_WorkerStreamClient, + streamExit <-chan struct{}, + outgoing <-chan *worker_pb.WorkerMessage, + cmds chan<- grpcCommand) { + + msgCh := make(chan *worker_pb.WorkerMessage) + errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy + // Goroutine to handle blocking stream.Recv() and simultaneously handle exit + // signals + go func() { + for msg := range msgCh { + if err := stream.Send(msg); err != nil { + errCh <- err + return // Exit the receiver goroutine on error/EOF + } } + close(errCh) + }() - if err := stream.Send(msg); err != nil { + for msg := range outgoing { + select { + case msgCh <- msg: + case err := <-errCh: glog.Errorf("Failed to send message to admin: %v", err) - c.mutex.Lock() - c.connected = false - c.mutex.Unlock() - break + cmds <- grpcCommand{action: ActionStreamError, data: err} + return + case <-streamExit: + close(msgCh) + <-errCh + return } } } -// handleOutgoingWithReady processes outgoing messages and signals when ready -func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) { - // Signal that this handler is ready to process messages - close(ready) - - // Now process messages normally - c.handleOutgoing() -} - // handleIncoming processes incoming messages from admin -func (c *GrpcAdminClient) handleIncoming() { - glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID) +func handleIncoming( + workerID string, + stream worker_pb.WorkerService_WorkerStreamClient, + streamExit <-chan struct{}, + incoming chan<- *worker_pb.AdminMessage, + cmds chan<- grpcCommand) { + glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID) + msgCh := make(chan *worker_pb.AdminMessage) + errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy + // Goroutine to handle blocking stream.Recv() and simultaneously handle exit + // signals + go func() { + for { + msg, err := stream.Recv() + if err != nil { + errCh <- err + return // Exit the receiver goroutine on error/EOF + } + msgCh <- msg + } + }() for { - c.mutex.RLock() - connected := c.connected - stream := c.stream - c.mutex.RUnlock() - - if !connected { - glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID) - break - } + glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", workerID) + + select { + case msg := <-msgCh: + // Message successfully received from the stream + glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message) - glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", c.workerID) - msg, err := stream.Recv() - if err != nil { + // Route message to waiting goroutines or general handler (original select logic) + select { + case incoming <- msg: + glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID) + case <-time.After(time.Second): + glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", workerID, msg.Message) + } + + case err := <-errCh: + // Stream Receiver goroutine reported an error (EOF or network error) if err == io.EOF { - glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", c.workerID) + glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID) } else { - glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err) + glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err) } - c.mutex.Lock() - c.connected = false - c.mutex.Unlock() - break - } - glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message) + // Report the failure as a command to the managerLoop (blocking) + cmds <- grpcCommand{action: ActionStreamError, data: err} - // Route message to waiting goroutines or general handler - select { - case c.incoming <- msg: - glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID) - case <-time.After(time.Second): - glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message) + // Exit the main handler loop + glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID) + return + + case <-streamExit: + // Manager closed this channel, signaling a controlled disconnection. + glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID) + return } } +} - glog.V(1).Infof("INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID) +// Connect establishes gRPC connection to admin server with TLS detection +func (c *GrpcAdminClient) Disconnect() error { + resp := make(chan error) + c.cmds <- grpcCommand{ + action: ActionDisconnect, + resp: resp, + } + err := <-resp + c.closeOnce.Do(func() { + close(c.cmds) + }) + return err } -// handleIncomingWithReady processes incoming messages and signals when ready -func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) { - // Signal that this handler is ready to process messages - close(ready) +func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { + if !s.connected { + cmd.resp <- fmt.Errorf("already disconnected") + return + } + + // Send shutdown signal to stop reconnection loop + close(s.reconnectStop) + + // Send shutdown signal to stop handlers loop + close(s.streamExit) + + s.connected = false + s.shouldReconnect = false + + // Send shutdown message + shutdownMsg := &worker_pb.WorkerMessage{ + WorkerId: c.workerID, + Timestamp: time.Now().Unix(), + Message: &worker_pb.WorkerMessage_Shutdown{ + Shutdown: &worker_pb.WorkerShutdown{ + WorkerId: c.workerID, + Reason: "normal shutdown", + }, + }, + } - // Now process messages normally - c.handleIncoming() + // Close outgoing/incoming + select { + case c.outgoing <- shutdownMsg: + case <-time.After(time.Second): + glog.Warningf("Failed to send shutdown message") + } + + // Cancel stream context + if s.streamCancel != nil { + s.streamCancel() + } + + // Close stream + if s.stream != nil { + s.stream.CloseSend() + } + + // Close connection + if s.conn != nil { + s.conn.Close() + } + + // Close channels + close(c.outgoing) + close(c.incoming) + + glog.Infof("Disconnected from admin server") + cmd.resp <- nil } // RegisterWorker registers the worker with the admin server func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error { - // Store worker info for re-registration after reconnection - c.mutex.Lock() - c.lastWorkerInfo = worker - c.mutex.Unlock() - - // If not connected, registration will happen when connection is established - if !c.connected { - glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection") - return nil + respCh := make(chan error, 1) + request := registrationRequest{ + Worker: worker, + Resp: respCh, } - - return c.sendRegistration(worker) + c.cmds <- grpcCommand{ + action: ActionRegisterWorker, + data: request, + } + return <-respCh } // sendRegistration sends the registration message and waits for response @@ -468,7 +537,7 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { } // sendRegistrationSync sends the registration message synchronously -func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { +func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData, stream worker_pb.WorkerService_WorkerStreamClient) error { capabilities := make([]string, len(worker.Capabilities)) for i, cap := range worker.Capabilities { capabilities[i] = string(cap) @@ -489,7 +558,7 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { } // Send directly to stream to ensure it's the first message - if err := c.stream.Send(msg); err != nil { + if err := stream.Send(msg); err != nil { return fmt.Errorf("failed to send registration message: %w", err) } @@ -500,7 +569,7 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { // Start a goroutine to listen for the response go func() { for { - response, err := c.stream.Recv() + response, err := stream.Recv() if err != nil { errChan <- fmt.Errorf("failed to receive registration response: %w", err) return @@ -511,6 +580,8 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { return } // Continue waiting if it's not a registration response + // If stream is stuck, reconnect() will kill it, cleaning up this + // goroutine } }() @@ -535,13 +606,44 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { } } +func (c *GrpcAdminClient) IsConnected() bool { + respCh := make(chan bool, 1) + + c.cmds <- grpcCommand{ + action: ActionQueryConnected, + data: respCh, + } + + return <-respCh +} + +func (c *GrpcAdminClient) IsReconnecting() bool { + respCh := make(chan bool, 1) + + c.cmds <- grpcCommand{ + action: ActionQueryReconnecting, + data: respCh, + } + + return <-respCh +} + +func (c *GrpcAdminClient) ShouldReconnect() bool { + respCh := make(chan bool, 1) + + c.cmds <- grpcCommand{ + action: ActionQueryShouldReconnect, + data: respCh, + } + + return <-respCh +} + // SendHeartbeat sends heartbeat to admin server func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just skip the heartbeat - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -587,11 +689,9 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta // RequestTask requests a new task from admin server func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just return no task - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -677,11 +777,9 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str // CompleteTaskWithMetadata reports task completion with additional metadata func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just skip the completion report - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -726,11 +824,9 @@ func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, // UpdateTaskProgress updates task progress to admin server func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error { - if !c.connected { + if !c.IsConnected() { // If we're currently reconnecting, don't wait - just skip the progress update - c.mutex.RLock() - reconnecting := c.reconnecting - c.mutex.RUnlock() + reconnecting := c.IsReconnecting() if reconnecting { // Don't treat as an error - reconnection is in progress @@ -765,53 +861,13 @@ func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) er } } -// IsConnected returns whether the client is connected -func (c *GrpcAdminClient) IsConnected() bool { - c.mutex.RLock() - defer c.mutex.RUnlock() - return c.connected -} - -// IsReconnecting returns whether the client is currently attempting to reconnect -func (c *GrpcAdminClient) IsReconnecting() bool { - c.mutex.RLock() - defer c.mutex.RUnlock() - return c.reconnecting -} - -// SetReconnectionSettings allows configuration of reconnection behavior -func (c *GrpcAdminClient) SetReconnectionSettings(maxAttempts int, initialBackoff, maxBackoff time.Duration, multiplier float64) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.maxReconnectAttempts = maxAttempts - c.reconnectBackoff = initialBackoff - c.maxReconnectBackoff = maxBackoff - c.reconnectMultiplier = multiplier -} - -// StopReconnection stops the reconnection loop -func (c *GrpcAdminClient) StopReconnection() { - c.mutex.Lock() - defer c.mutex.Unlock() - c.shouldReconnect = false -} - -// StartReconnection starts the reconnection loop -func (c *GrpcAdminClient) StartReconnection() { - c.mutex.Lock() - defer c.mutex.Unlock() - c.shouldReconnect = true -} - // waitForConnection waits for the connection to be established or timeout func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { - c.mutex.RLock() - connected := c.connected - shouldReconnect := c.shouldReconnect - c.mutex.RUnlock() + connected := c.IsConnected() + shouldReconnect := c.ShouldReconnect() if connected { return nil @@ -833,104 +889,6 @@ func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage { return c.incoming } -// MockAdminClient provides a mock implementation for testing -type MockAdminClient struct { - workerID string - connected bool - tasks []*types.TaskInput - mutex sync.RWMutex -} - -// NewMockAdminClient creates a new mock admin client -func NewMockAdminClient() *MockAdminClient { - return &MockAdminClient{ - connected: true, - tasks: make([]*types.TaskInput, 0), - } -} - -// Connect mock implementation -func (m *MockAdminClient) Connect() error { - m.mutex.Lock() - defer m.mutex.Unlock() - m.connected = true - return nil -} - -// Disconnect mock implementation -func (m *MockAdminClient) Disconnect() error { - m.mutex.Lock() - defer m.mutex.Unlock() - m.connected = false - return nil -} - -// RegisterWorker mock implementation -func (m *MockAdminClient) RegisterWorker(worker *types.WorkerData) error { - m.workerID = worker.ID - glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities) - return nil -} - -// SendHeartbeat mock implementation -func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error { - glog.V(2).Infof("Mock: Heartbeat from worker %s, status: %s, load: %d/%d", - workerID, status.Status, status.CurrentLoad, status.MaxConcurrent) - return nil -} - -// RequestTask mock implementation -func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) { - m.mutex.Lock() - defer m.mutex.Unlock() - - if len(m.tasks) > 0 { - task := m.tasks[0] - m.tasks = m.tasks[1:] - glog.Infof("Mock: Assigned task %s to worker %s", task.ID, workerID) - return task, nil - } - - // No tasks available - return nil, nil -} - -// CompleteTask mock implementation -func (m *MockAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error { - if success { - glog.Infof("Mock: Task %s completed successfully", taskID) - } else { - glog.Infof("Mock: Task %s failed: %s", taskID, errorMsg) - } - return nil -} - -// UpdateTaskProgress mock implementation -func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) error { - glog.V(2).Infof("Mock: Task %s progress: %.1f%%", taskID, progress) - return nil -} - -// CompleteTaskWithMetadata mock implementation -func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error { - glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata) - return nil -} - -// IsConnected mock implementation -func (m *MockAdminClient) IsConnected() bool { - m.mutex.RLock() - defer m.mutex.RUnlock() - return m.connected -} - -// AddMockTask adds a mock task for testing -func (m *MockAdminClient) AddMockTask(task *types.TaskInput) { - m.mutex.Lock() - defer m.mutex.Unlock() - m.tasks = append(m.tasks, task) -} - // CreateAdminClient creates an admin client with the provided dial option func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) { return NewGrpcAdminClient(adminServer, workerID, dialOption), nil |
