aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMariano Ntrougkas <44480600+marios1861@users.noreply.github.com>2025-10-23 02:16:46 +0300
committerGitHub <noreply@github.com>2025-10-22 16:16:46 -0700
commitfa025dc96f5279ad90bcff13ca67ab1a7a52db9c (patch)
tree6889b76de06420c5ca86699dcb1cae443df9dd00
parentf7bd75ef3bf993a8f4aa5b647617556ab7567596 (diff)
downloadseaweedfs-fa025dc96f5279ad90bcff13ca67ab1a7a52db9c.tar.xz
seaweedfs-fa025dc96f5279ad90bcff13ca67ab1a7a52db9c.zip
♻️ refactor(worker): decouple state management using command-query pattern (#7354)
* ♻️ refactor(worker): decouple state management using command-query pattern This commit eliminates all uses of sync.Mutex across the `worker.go` and `client.go` components, changing how mutable state is accessed and modified. Single Owner Principle is now enforced. - Guarantees thread safety and prevents data races by ensuring that only one goroutine ever modifies or reads state. Impact: Improves application concurrency, reliability, and maintainability by isolating state concerns. * 🐛 fix(worker): fix race condition when closing The use of select/default is wrong for mandatory shutdown signals. * 🐛 fix(worker): do not get tickers in every iteration * 🐛 fix(worker): fix race condition when closing pt 2 refactor `handleOutgoing` to mirror the non-blocking logic of `handleIncoming` * address comments * To ensure stream errors are always processed, the send should be blocking. * avoid blocking the manager loop while waiting for tasks to complete --------- Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com> Co-authored-by: Chris Lu <chris.lu@gmail.com>
-rw-r--r--weed/worker/client.go808
-rw-r--r--weed/worker/worker.go523
2 files changed, 753 insertions, 578 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
index 9066afdf3..0ec36e419 100644
--- a/weed/worker/client.go
+++ b/weed/worker/client.go
@@ -2,6 +2,7 @@ package worker
import (
"context"
+ "errors"
"fmt"
"io"
"sync"
@@ -14,22 +15,18 @@ import (
"google.golang.org/grpc"
)
+var (
+ ErrAlreadyConnected = errors.New("already connected")
+)
+
// GrpcAdminClient implements AdminClient using gRPC bidirectional streaming
type GrpcAdminClient struct {
adminAddress string
workerID string
dialOption grpc.DialOption
- conn *grpc.ClientConn
- client worker_pb.WorkerServiceClient
- stream worker_pb.WorkerService_WorkerStreamClient
- streamCtx context.Context
- streamCancel context.CancelFunc
-
- connected bool
- reconnecting bool
- shouldReconnect bool
- mutex sync.RWMutex
+ cmds chan grpcCommand
+ closeOnce sync.Once
// Reconnection parameters
maxReconnectAttempts int
@@ -37,17 +34,48 @@ type GrpcAdminClient struct {
maxReconnectBackoff time.Duration
reconnectMultiplier float64
- // Worker registration info for re-registration after reconnection
- lastWorkerInfo *types.WorkerData
-
// Channels for communication
- outgoing chan *worker_pb.WorkerMessage
- incoming chan *worker_pb.AdminMessage
- responseChans map[string]chan *worker_pb.AdminMessage
- responsesMutex sync.RWMutex
+ outgoing chan *worker_pb.WorkerMessage
+ incoming chan *worker_pb.AdminMessage
+ responseChans map[string]chan *worker_pb.AdminMessage
+}
+
+type grpcAction string
+
+const (
+ ActionConnect grpcAction = "connect"
+ ActionDisconnect grpcAction = "disconnect"
+ ActionReconnect grpcAction = "reconnect"
+ ActionStreamError grpcAction = "stream_error"
+ ActionRegisterWorker grpcAction = "register_worker"
+ ActionQueryReconnecting grpcAction = "query_reconnecting"
+ ActionQueryConnected grpcAction = "query_connected"
+ ActionQueryShouldReconnect grpcAction = "query_shouldreconnect"
+)
+
+type registrationRequest struct {
+ Worker *types.WorkerData
+ Resp chan error // Used to send the registration result back
+}
- // Shutdown channel
- shutdownChan chan struct{}
+type grpcCommand struct {
+ action grpcAction
+ data any
+ resp chan error // for reporting success/failure
+}
+
+type grpcState struct {
+ connected bool
+ reconnecting bool
+ shouldReconnect bool
+ conn *grpc.ClientConn
+ client worker_pb.WorkerServiceClient
+ stream worker_pb.WorkerService_WorkerStreamClient
+ streamCtx context.Context
+ streamCancel context.CancelFunc
+ lastWorkerInfo *types.WorkerData
+ reconnectStop chan struct{}
+ streamExit chan struct{}
}
// NewGrpcAdminClient creates a new gRPC admin client
@@ -55,11 +83,10 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di
// Admin uses HTTP port + 10000 as gRPC port
grpcAddress := pb.ServerToGrpcAddress(adminAddress)
- return &GrpcAdminClient{
+ c := &GrpcAdminClient{
adminAddress: grpcAddress,
workerID: workerID,
dialOption: dialOption,
- shouldReconnect: true,
maxReconnectAttempts: 0, // 0 means infinite attempts
reconnectBackoff: 1 * time.Second,
maxReconnectBackoff: 30 * time.Second,
@@ -67,65 +94,129 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di
outgoing: make(chan *worker_pb.WorkerMessage, 100),
incoming: make(chan *worker_pb.AdminMessage, 100),
responseChans: make(map[string]chan *worker_pb.AdminMessage),
- shutdownChan: make(chan struct{}),
+ cmds: make(chan grpcCommand),
+ }
+ go c.managerLoop()
+ return c
+}
+
+func (c *GrpcAdminClient) managerLoop() {
+ state := &grpcState{shouldReconnect: true}
+
+ for cmd := range c.cmds {
+ switch cmd.action {
+ case ActionConnect:
+ c.handleConnect(cmd, state)
+ case ActionDisconnect:
+ c.handleDisconnect(cmd, state)
+ case ActionReconnect:
+ if state.connected || state.reconnecting || !state.shouldReconnect {
+ cmd.resp <- ErrAlreadyConnected
+ continue
+ }
+ state.reconnecting = true // Manager acknowledges the attempt
+ err := c.reconnect(state)
+ state.reconnecting = false
+ cmd.resp <- err
+ case ActionStreamError:
+ state.connected = false
+ case ActionRegisterWorker:
+ req := cmd.data.(registrationRequest)
+ state.lastWorkerInfo = req.Worker
+ if !state.connected {
+ glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
+ // Respond immediately with success (registration will happen later)
+ req.Resp <- nil
+ continue
+ }
+ err := c.sendRegistration(req.Worker)
+ req.Resp <- err
+ case ActionQueryConnected:
+ respCh := cmd.data.(chan bool)
+ respCh <- state.connected
+ case ActionQueryReconnecting:
+ respCh := cmd.data.(chan bool)
+ respCh <- state.reconnecting
+ case ActionQueryShouldReconnect:
+ respCh := cmd.data.(chan bool)
+ respCh <- state.shouldReconnect
+ }
}
}
// Connect establishes gRPC connection to admin server with TLS detection
func (c *GrpcAdminClient) Connect() error {
- c.mutex.Lock()
- if c.connected {
- c.mutex.Unlock()
- return fmt.Errorf("already connected")
+ resp := make(chan error)
+ c.cmds <- grpcCommand{
+ action: ActionConnect,
+ resp: resp,
}
- // Release lock before calling attemptConnection which needs to acquire locks internally
- c.mutex.Unlock()
+ return <-resp
+}
- // Always start the reconnection loop, even if initial connection fails
- go c.reconnectionLoop()
+func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) {
+ if s.connected {
+ cmd.resp <- fmt.Errorf("already connected")
+ return
+ }
+
+ // Start reconnection loop immediately (async)
+ stop := make(chan struct{})
+ s.reconnectStop = stop
+ go c.reconnectionLoop(stop)
- // Attempt initial connection
- err := c.attemptConnection()
+ // Attempt the initial connection
+ err := c.attemptConnection(s)
if err != nil {
glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
- return err
+ cmd.resp <- err
+ return
}
+ cmd.resp <- nil
+}
- return nil
+// createConnection attempts to connect using the provided dial option
+func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to admin server: %w", err)
+ }
+
+ glog.Infof("Connected to admin server at %s", c.adminAddress)
+ return conn, nil
}
// attemptConnection tries to establish the connection without managing the reconnection loop
-func (c *GrpcAdminClient) attemptConnection() error {
+func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
// Detect TLS support and create appropriate connection
conn, err := c.createConnection()
if err != nil {
return fmt.Errorf("failed to connect to admin server: %w", err)
}
- c.conn = conn
- c.client = worker_pb.NewWorkerServiceClient(conn)
+ s.conn = conn
+ s.client = worker_pb.NewWorkerServiceClient(conn)
// Create bidirectional stream
- c.streamCtx, c.streamCancel = context.WithCancel(context.Background())
- stream, err := c.client.WorkerStream(c.streamCtx)
+ s.streamCtx, s.streamCancel = context.WithCancel(context.Background())
+ stream, err := s.client.WorkerStream(s.streamCtx)
+ glog.Infof("Worker stream created")
if err != nil {
- c.conn.Close()
+ s.conn.Close()
return fmt.Errorf("failed to create worker stream: %w", err)
}
-
- c.stream = stream
- c.connected = true
+ s.connected = true
+ s.stream = stream
// Always check for worker info and send registration immediately as the very first message
- c.mutex.RLock()
- workerInfo := c.lastWorkerInfo
- c.mutex.RUnlock()
-
- if workerInfo != nil {
+ if s.lastWorkerInfo != nil {
// Send registration synchronously as the very first message
- if err := c.sendRegistrationSync(workerInfo); err != nil {
- c.conn.Close()
- c.connected = false
+ if err := c.sendRegistrationSync(s.lastWorkerInfo, s.stream); err != nil {
+ s.conn.Close()
+ s.connected = false
return fmt.Errorf("failed to register worker: %w", err)
}
glog.Infof("Worker registered successfully with admin server")
@@ -134,290 +225,268 @@ func (c *GrpcAdminClient) attemptConnection() error {
glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
}
- // Start stream handlers with synchronization
- outgoingReady := make(chan struct{})
- incomingReady := make(chan struct{})
-
- go c.handleOutgoingWithReady(outgoingReady)
- go c.handleIncomingWithReady(incomingReady)
-
- // Wait for both handlers to be ready
- <-outgoingReady
- <-incomingReady
+ // Start stream handlers
+ s.streamExit = make(chan struct{})
+ go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds)
+ go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds)
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
}
-// createConnection attempts to connect using the provided dial option
-func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
- if err != nil {
- return nil, fmt.Errorf("failed to connect to admin server: %w", err)
- }
-
- glog.Infof("Connected to admin server at %s", c.adminAddress)
- return conn, nil
-}
-
-// Disconnect closes the gRPC connection
-func (c *GrpcAdminClient) Disconnect() error {
- c.mutex.Lock()
- defer c.mutex.Unlock()
-
- if !c.connected {
- return nil
- }
-
- c.connected = false
- c.shouldReconnect = false
-
- // Send shutdown signal to stop reconnection loop
- select {
- case c.shutdownChan <- struct{}{}:
- default:
- }
-
- // Send shutdown message
- shutdownMsg := &worker_pb.WorkerMessage{
- WorkerId: c.workerID,
- Timestamp: time.Now().Unix(),
- Message: &worker_pb.WorkerMessage_Shutdown{
- Shutdown: &worker_pb.WorkerShutdown{
- WorkerId: c.workerID,
- Reason: "normal shutdown",
- },
- },
+// reconnect attempts to re-establish the connection
+func (c *GrpcAdminClient) reconnect(s *grpcState) error {
+ // Clean up existing connection completely
+ if s.streamCancel != nil {
+ s.streamCancel()
}
-
- select {
- case c.outgoing <- shutdownMsg:
- case <-time.After(time.Second):
- glog.Warningf("Failed to send shutdown message")
+ if s.stream != nil {
+ s.stream.CloseSend()
}
-
- // Cancel stream context
- if c.streamCancel != nil {
- c.streamCancel()
+ if s.conn != nil {
+ s.conn.Close()
}
+ s.connected = false
- // Close stream
- if c.stream != nil {
- c.stream.CloseSend()
- }
-
- // Close connection
- if c.conn != nil {
- c.conn.Close()
+ // Attempt to re-establish connection using the same logic as initial connection
+ if err := c.attemptConnection(s); err != nil {
+ return fmt.Errorf("failed to reconnect: %w", err)
}
- // Close channels
- close(c.outgoing)
- close(c.incoming)
-
- glog.Infof("Disconnected from admin server")
+ // Registration is now handled in attemptConnection if worker info is available
return nil
}
// reconnectionLoop handles automatic reconnection with exponential backoff
-func (c *GrpcAdminClient) reconnectionLoop() {
+func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) {
backoff := c.reconnectBackoff
attempts := 0
for {
+ waitDuration := backoff
+ if attempts == 0 {
+ waitDuration = time.Second
+ }
select {
- case <-c.shutdownChan:
+ case <-reconnectStop:
return
- default:
+ case <-time.After(waitDuration):
}
-
- c.mutex.RLock()
- shouldReconnect := c.shouldReconnect && !c.connected && !c.reconnecting
- c.mutex.RUnlock()
-
- if !shouldReconnect {
- time.Sleep(time.Second)
- continue
+ resp := make(chan error, 1)
+ c.cmds <- grpcCommand{
+ action: ActionReconnect,
+ resp: resp,
}
-
- c.mutex.Lock()
- c.reconnecting = true
- c.mutex.Unlock()
-
- glog.Infof("Attempting to reconnect to admin server (attempt %d)", attempts+1)
-
- // Attempt to reconnect
- if err := c.reconnect(); err != nil {
+ err := <-resp
+ if err == nil {
+ // Successful reconnection
+ attempts = 0
+ backoff = c.reconnectBackoff
+ glog.Infof("Successfully reconnected to admin server")
+ } else if errors.Is(err, ErrAlreadyConnected) {
+ attempts = 0
+ backoff = c.reconnectBackoff
+ } else {
attempts++
glog.Errorf("Reconnection attempt %d failed: %v", attempts, err)
- // Reset reconnecting flag
- c.mutex.Lock()
- c.reconnecting = false
- c.mutex.Unlock()
-
// Check if we should give up
if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts {
glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts)
- c.mutex.Lock()
- c.shouldReconnect = false
- c.mutex.Unlock()
return
}
- // Wait with exponential backoff
- glog.Infof("Waiting %v before next reconnection attempt", backoff)
-
- select {
- case <-c.shutdownChan:
- return
- case <-time.After(backoff):
- }
-
// Increase backoff
backoff = time.Duration(float64(backoff) * c.reconnectMultiplier)
if backoff > c.maxReconnectBackoff {
backoff = c.maxReconnectBackoff
}
- } else {
- // Successful reconnection
- attempts = 0
- backoff = c.reconnectBackoff
- glog.Infof("Successfully reconnected to admin server")
-
- c.mutex.Lock()
- c.reconnecting = false
- c.mutex.Unlock()
+ glog.Infof("Waiting %v before next reconnection attempt", backoff)
}
}
}
-// reconnect attempts to re-establish the connection
-func (c *GrpcAdminClient) reconnect() error {
- // Clean up existing connection completely
- c.mutex.Lock()
- if c.streamCancel != nil {
- c.streamCancel()
- }
- if c.stream != nil {
- c.stream.CloseSend()
- }
- if c.conn != nil {
- c.conn.Close()
- }
- c.connected = false
- c.mutex.Unlock()
-
- // Attempt to re-establish connection using the same logic as initial connection
- err := c.attemptConnection()
- if err != nil {
- return fmt.Errorf("failed to reconnect: %w", err)
- }
-
- // Registration is now handled in attemptConnection if worker info is available
- return nil
-}
-
// handleOutgoing processes outgoing messages to admin
-func (c *GrpcAdminClient) handleOutgoing() {
- for msg := range c.outgoing {
- c.mutex.RLock()
- connected := c.connected
- stream := c.stream
- c.mutex.RUnlock()
-
- if !connected {
- break
+func handleOutgoing(
+ stream worker_pb.WorkerService_WorkerStreamClient,
+ streamExit <-chan struct{},
+ outgoing <-chan *worker_pb.WorkerMessage,
+ cmds chan<- grpcCommand) {
+
+ msgCh := make(chan *worker_pb.WorkerMessage)
+ errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
+ // Goroutine to handle blocking stream.Recv() and simultaneously handle exit
+ // signals
+ go func() {
+ for msg := range msgCh {
+ if err := stream.Send(msg); err != nil {
+ errCh <- err
+ return // Exit the receiver goroutine on error/EOF
+ }
}
+ close(errCh)
+ }()
- if err := stream.Send(msg); err != nil {
+ for msg := range outgoing {
+ select {
+ case msgCh <- msg:
+ case err := <-errCh:
glog.Errorf("Failed to send message to admin: %v", err)
- c.mutex.Lock()
- c.connected = false
- c.mutex.Unlock()
- break
+ cmds <- grpcCommand{action: ActionStreamError, data: err}
+ return
+ case <-streamExit:
+ close(msgCh)
+ <-errCh
+ return
}
}
}
-// handleOutgoingWithReady processes outgoing messages and signals when ready
-func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) {
- // Signal that this handler is ready to process messages
- close(ready)
-
- // Now process messages normally
- c.handleOutgoing()
-}
-
// handleIncoming processes incoming messages from admin
-func (c *GrpcAdminClient) handleIncoming() {
- glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID)
+func handleIncoming(
+ workerID string,
+ stream worker_pb.WorkerService_WorkerStreamClient,
+ streamExit <-chan struct{},
+ incoming chan<- *worker_pb.AdminMessage,
+ cmds chan<- grpcCommand) {
+ glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID)
+ msgCh := make(chan *worker_pb.AdminMessage)
+ errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
+ // Goroutine to handle blocking stream.Recv() and simultaneously handle exit
+ // signals
+ go func() {
+ for {
+ msg, err := stream.Recv()
+ if err != nil {
+ errCh <- err
+ return // Exit the receiver goroutine on error/EOF
+ }
+ msgCh <- msg
+ }
+ }()
for {
- c.mutex.RLock()
- connected := c.connected
- stream := c.stream
- c.mutex.RUnlock()
-
- if !connected {
- glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID)
- break
- }
+ glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", workerID)
+
+ select {
+ case msg := <-msgCh:
+ // Message successfully received from the stream
+ glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message)
- glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", c.workerID)
- msg, err := stream.Recv()
- if err != nil {
+ // Route message to waiting goroutines or general handler (original select logic)
+ select {
+ case incoming <- msg:
+ glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID)
+ case <-time.After(time.Second):
+ glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", workerID, msg.Message)
+ }
+
+ case err := <-errCh:
+ // Stream Receiver goroutine reported an error (EOF or network error)
if err == io.EOF {
- glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", c.workerID)
+ glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID)
} else {
- glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err)
+ glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err)
}
- c.mutex.Lock()
- c.connected = false
- c.mutex.Unlock()
- break
- }
- glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message)
+ // Report the failure as a command to the managerLoop (blocking)
+ cmds <- grpcCommand{action: ActionStreamError, data: err}
- // Route message to waiting goroutines or general handler
- select {
- case c.incoming <- msg:
- glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID)
- case <-time.After(time.Second):
- glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message)
+ // Exit the main handler loop
+ glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID)
+ return
+
+ case <-streamExit:
+ // Manager closed this channel, signaling a controlled disconnection.
+ glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID)
+ return
}
}
+}
- glog.V(1).Infof("INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID)
+// Connect establishes gRPC connection to admin server with TLS detection
+func (c *GrpcAdminClient) Disconnect() error {
+ resp := make(chan error)
+ c.cmds <- grpcCommand{
+ action: ActionDisconnect,
+ resp: resp,
+ }
+ err := <-resp
+ c.closeOnce.Do(func() {
+ close(c.cmds)
+ })
+ return err
}
-// handleIncomingWithReady processes incoming messages and signals when ready
-func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
- // Signal that this handler is ready to process messages
- close(ready)
+func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
+ if !s.connected {
+ cmd.resp <- fmt.Errorf("already disconnected")
+ return
+ }
+
+ // Send shutdown signal to stop reconnection loop
+ close(s.reconnectStop)
+
+ // Send shutdown signal to stop handlers loop
+ close(s.streamExit)
+
+ s.connected = false
+ s.shouldReconnect = false
+
+ // Send shutdown message
+ shutdownMsg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_Shutdown{
+ Shutdown: &worker_pb.WorkerShutdown{
+ WorkerId: c.workerID,
+ Reason: "normal shutdown",
+ },
+ },
+ }
- // Now process messages normally
- c.handleIncoming()
+ // Close outgoing/incoming
+ select {
+ case c.outgoing <- shutdownMsg:
+ case <-time.After(time.Second):
+ glog.Warningf("Failed to send shutdown message")
+ }
+
+ // Cancel stream context
+ if s.streamCancel != nil {
+ s.streamCancel()
+ }
+
+ // Close stream
+ if s.stream != nil {
+ s.stream.CloseSend()
+ }
+
+ // Close connection
+ if s.conn != nil {
+ s.conn.Close()
+ }
+
+ // Close channels
+ close(c.outgoing)
+ close(c.incoming)
+
+ glog.Infof("Disconnected from admin server")
+ cmd.resp <- nil
}
// RegisterWorker registers the worker with the admin server
func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
- // Store worker info for re-registration after reconnection
- c.mutex.Lock()
- c.lastWorkerInfo = worker
- c.mutex.Unlock()
-
- // If not connected, registration will happen when connection is established
- if !c.connected {
- glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
- return nil
+ respCh := make(chan error, 1)
+ request := registrationRequest{
+ Worker: worker,
+ Resp: respCh,
}
-
- return c.sendRegistration(worker)
+ c.cmds <- grpcCommand{
+ action: ActionRegisterWorker,
+ data: request,
+ }
+ return <-respCh
}
// sendRegistration sends the registration message and waits for response
@@ -468,7 +537,7 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
}
// sendRegistrationSync sends the registration message synchronously
-func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
+func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData, stream worker_pb.WorkerService_WorkerStreamClient) error {
capabilities := make([]string, len(worker.Capabilities))
for i, cap := range worker.Capabilities {
capabilities[i] = string(cap)
@@ -489,7 +558,7 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
}
// Send directly to stream to ensure it's the first message
- if err := c.stream.Send(msg); err != nil {
+ if err := stream.Send(msg); err != nil {
return fmt.Errorf("failed to send registration message: %w", err)
}
@@ -500,7 +569,7 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
// Start a goroutine to listen for the response
go func() {
for {
- response, err := c.stream.Recv()
+ response, err := stream.Recv()
if err != nil {
errChan <- fmt.Errorf("failed to receive registration response: %w", err)
return
@@ -511,6 +580,8 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
return
}
// Continue waiting if it's not a registration response
+ // If stream is stuck, reconnect() will kill it, cleaning up this
+ // goroutine
}
}()
@@ -535,13 +606,44 @@ func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
}
}
+func (c *GrpcAdminClient) IsConnected() bool {
+ respCh := make(chan bool, 1)
+
+ c.cmds <- grpcCommand{
+ action: ActionQueryConnected,
+ data: respCh,
+ }
+
+ return <-respCh
+}
+
+func (c *GrpcAdminClient) IsReconnecting() bool {
+ respCh := make(chan bool, 1)
+
+ c.cmds <- grpcCommand{
+ action: ActionQueryReconnecting,
+ data: respCh,
+ }
+
+ return <-respCh
+}
+
+func (c *GrpcAdminClient) ShouldReconnect() bool {
+ respCh := make(chan bool, 1)
+
+ c.cmds <- grpcCommand{
+ action: ActionQueryShouldReconnect,
+ data: respCh,
+ }
+
+ return <-respCh
+}
+
// SendHeartbeat sends heartbeat to admin server
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
- if !c.connected {
+ if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just skip the heartbeat
- c.mutex.RLock()
- reconnecting := c.reconnecting
- c.mutex.RUnlock()
+ reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
@@ -587,11 +689,9 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta
// RequestTask requests a new task from admin server
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
- if !c.connected {
+ if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just return no task
- c.mutex.RLock()
- reconnecting := c.reconnecting
- c.mutex.RUnlock()
+ reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
@@ -677,11 +777,9 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str
// CompleteTaskWithMetadata reports task completion with additional metadata
func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
- if !c.connected {
+ if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just skip the completion report
- c.mutex.RLock()
- reconnecting := c.reconnecting
- c.mutex.RUnlock()
+ reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
@@ -726,11 +824,9 @@ func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool,
// UpdateTaskProgress updates task progress to admin server
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
- if !c.connected {
+ if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just skip the progress update
- c.mutex.RLock()
- reconnecting := c.reconnecting
- c.mutex.RUnlock()
+ reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
@@ -765,53 +861,13 @@ func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) er
}
}
-// IsConnected returns whether the client is connected
-func (c *GrpcAdminClient) IsConnected() bool {
- c.mutex.RLock()
- defer c.mutex.RUnlock()
- return c.connected
-}
-
-// IsReconnecting returns whether the client is currently attempting to reconnect
-func (c *GrpcAdminClient) IsReconnecting() bool {
- c.mutex.RLock()
- defer c.mutex.RUnlock()
- return c.reconnecting
-}
-
-// SetReconnectionSettings allows configuration of reconnection behavior
-func (c *GrpcAdminClient) SetReconnectionSettings(maxAttempts int, initialBackoff, maxBackoff time.Duration, multiplier float64) {
- c.mutex.Lock()
- defer c.mutex.Unlock()
- c.maxReconnectAttempts = maxAttempts
- c.reconnectBackoff = initialBackoff
- c.maxReconnectBackoff = maxBackoff
- c.reconnectMultiplier = multiplier
-}
-
-// StopReconnection stops the reconnection loop
-func (c *GrpcAdminClient) StopReconnection() {
- c.mutex.Lock()
- defer c.mutex.Unlock()
- c.shouldReconnect = false
-}
-
-// StartReconnection starts the reconnection loop
-func (c *GrpcAdminClient) StartReconnection() {
- c.mutex.Lock()
- defer c.mutex.Unlock()
- c.shouldReconnect = true
-}
-
// waitForConnection waits for the connection to be established or timeout
func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
- c.mutex.RLock()
- connected := c.connected
- shouldReconnect := c.shouldReconnect
- c.mutex.RUnlock()
+ connected := c.IsConnected()
+ shouldReconnect := c.ShouldReconnect()
if connected {
return nil
@@ -833,104 +889,6 @@ func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
return c.incoming
}
-// MockAdminClient provides a mock implementation for testing
-type MockAdminClient struct {
- workerID string
- connected bool
- tasks []*types.TaskInput
- mutex sync.RWMutex
-}
-
-// NewMockAdminClient creates a new mock admin client
-func NewMockAdminClient() *MockAdminClient {
- return &MockAdminClient{
- connected: true,
- tasks: make([]*types.TaskInput, 0),
- }
-}
-
-// Connect mock implementation
-func (m *MockAdminClient) Connect() error {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.connected = true
- return nil
-}
-
-// Disconnect mock implementation
-func (m *MockAdminClient) Disconnect() error {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.connected = false
- return nil
-}
-
-// RegisterWorker mock implementation
-func (m *MockAdminClient) RegisterWorker(worker *types.WorkerData) error {
- m.workerID = worker.ID
- glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities)
- return nil
-}
-
-// SendHeartbeat mock implementation
-func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
- glog.V(2).Infof("Mock: Heartbeat from worker %s, status: %s, load: %d/%d",
- workerID, status.Status, status.CurrentLoad, status.MaxConcurrent)
- return nil
-}
-
-// RequestTask mock implementation
-func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
-
- if len(m.tasks) > 0 {
- task := m.tasks[0]
- m.tasks = m.tasks[1:]
- glog.Infof("Mock: Assigned task %s to worker %s", task.ID, workerID)
- return task, nil
- }
-
- // No tasks available
- return nil, nil
-}
-
-// CompleteTask mock implementation
-func (m *MockAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
- if success {
- glog.Infof("Mock: Task %s completed successfully", taskID)
- } else {
- glog.Infof("Mock: Task %s failed: %s", taskID, errorMsg)
- }
- return nil
-}
-
-// UpdateTaskProgress mock implementation
-func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
- glog.V(2).Infof("Mock: Task %s progress: %.1f%%", taskID, progress)
- return nil
-}
-
-// CompleteTaskWithMetadata mock implementation
-func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
- glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata)
- return nil
-}
-
-// IsConnected mock implementation
-func (m *MockAdminClient) IsConnected() bool {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- return m.connected
-}
-
-// AddMockTask adds a mock task for testing
-func (m *MockAdminClient) AddMockTask(task *types.TaskInput) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- m.tasks = append(m.tasks, task)
-}
-
// CreateAdminClient creates an admin client with the provided dial option
func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) {
return NewGrpcAdminClient(adminServer, workerID, dialOption), nil
diff --git a/weed/worker/worker.go b/weed/worker/worker.go
index 0763fdc2e..afc203318 100644
--- a/weed/worker/worker.go
+++ b/weed/worker/worker.go
@@ -23,20 +23,56 @@ import (
// Worker represents a maintenance worker instance
type Worker struct {
- id string
- config *types.WorkerConfig
- registry *tasks.TaskRegistry
- currentTasks map[string]*types.TaskInput
- adminClient AdminClient
+ id string
+ config *types.WorkerConfig
+ registry *tasks.TaskRegistry
+ cmds chan workerCommand
+ state *workerState
+ taskLogHandler *tasks.TaskLogHandler
+ closeOnce sync.Once
+}
+type workerState struct {
running bool
- stopChan chan struct{}
- mutex sync.RWMutex
+ adminClient AdminClient
startTime time.Time
- tasksCompleted int
- tasksFailed int
+ stopChan chan struct{}
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
- taskLogHandler *tasks.TaskLogHandler
+ currentTasks map[string]*types.TaskInput
+ tasksCompleted int
+ tasksFailed int
+}
+
+type workerAction string
+
+const (
+ ActionStart workerAction = "start"
+ ActionStop workerAction = "stop"
+ ActionGetStatus workerAction = "getstatus"
+ ActionGetTaskLoad workerAction = "getload"
+ ActionSetTask workerAction = "settask"
+ ActionSetAdmin workerAction = "setadmin"
+ ActionRemoveTask workerAction = "removetask"
+ ActionGetAdmin workerAction = "getadmin"
+ ActionIncTaskFail workerAction = "inctaskfail"
+ ActionIncTaskComplete workerAction = "inctaskcomplete"
+ ActionGetHbTick workerAction = "gethbtick"
+ ActionGetReqTick workerAction = "getreqtick"
+ ActionGetStopChan workerAction = "getstopchan"
+ ActionSetHbTick workerAction = "sethbtick"
+ ActionSetReqTick workerAction = "setreqtick"
+ ActionGetStartTime workerAction = "getstarttime"
+ ActionGetCompletedTasks workerAction = "getcompletedtasks"
+ ActionGetFailedTasks workerAction = "getfailedtasks"
+ ActionCancelTask workerAction = "canceltask"
+ // ... other worker actions like Stop, Status, etc.
+)
+
+type statusResponse chan types.WorkerStatus
+type workerCommand struct {
+ action workerAction
+ data any
+ resp chan error // for reporting success/failure
}
// AdminClient defines the interface for communicating with the admin server
@@ -150,17 +186,222 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) {
id: workerID,
config: config,
registry: registry,
- currentTasks: make(map[string]*types.TaskInput),
- stopChan: make(chan struct{}),
- startTime: time.Now(),
taskLogHandler: taskLogHandler,
+ cmds: make(chan workerCommand),
}
glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll()))
-
+ go worker.managerLoop()
return worker, nil
}
+func (w *Worker) managerLoop() {
+ w.state = &workerState{
+ startTime: time.Now(),
+ stopChan: make(chan struct{}),
+ currentTasks: make(map[string]*types.TaskInput),
+ }
+
+ for cmd := range w.cmds {
+ switch cmd.action {
+ case ActionStart:
+ w.handleStart(cmd)
+ case ActionStop:
+ w.handleStop(cmd)
+ case ActionGetStatus:
+ respCh := cmd.data.(statusResponse)
+ var currentTasks []types.TaskInput
+ for _, task := range w.state.currentTasks {
+ currentTasks = append(currentTasks, *task)
+ }
+
+ statusStr := "active"
+ if len(w.state.currentTasks) >= w.config.MaxConcurrent {
+ statusStr = "busy"
+ }
+
+ status := types.WorkerStatus{
+ WorkerID: w.id,
+ Status: statusStr,
+ Capabilities: w.config.Capabilities,
+ MaxConcurrent: w.config.MaxConcurrent,
+ CurrentLoad: len(w.state.currentTasks),
+ LastHeartbeat: time.Now(),
+ CurrentTasks: currentTasks,
+ Uptime: time.Since(w.state.startTime),
+ TasksCompleted: w.state.tasksCompleted,
+ TasksFailed: w.state.tasksFailed,
+ }
+ respCh <- status
+ case ActionGetTaskLoad:
+ respCh := cmd.data.(chan int)
+ respCh <- len(w.state.currentTasks)
+ case ActionSetTask:
+ currentLoad := len(w.state.currentTasks)
+ if currentLoad >= w.config.MaxConcurrent {
+ cmd.resp <- fmt.Errorf("worker is at capacity")
+ }
+ task := cmd.data.(*types.TaskInput)
+ w.state.currentTasks[task.ID] = task
+ cmd.resp <- nil
+ case ActionSetAdmin:
+ admin := cmd.data.(AdminClient)
+ w.state.adminClient = admin
+ case ActionRemoveTask:
+ taskID := cmd.data.(string)
+ delete(w.state.currentTasks, taskID)
+ case ActionGetAdmin:
+ respCh := cmd.data.(chan AdminClient)
+ respCh <- w.state.adminClient
+ case ActionIncTaskFail:
+ w.state.tasksFailed++
+ case ActionIncTaskComplete:
+ w.state.tasksCompleted++
+ case ActionGetHbTick:
+ respCh := cmd.data.(chan *time.Ticker)
+ respCh <- w.state.heartbeatTicker
+ case ActionGetReqTick:
+ respCh := cmd.data.(chan *time.Ticker)
+ respCh <- w.state.requestTicker
+ case ActionSetHbTick:
+ w.state.heartbeatTicker = cmd.data.(*time.Ticker)
+ case ActionSetReqTick:
+ w.state.requestTicker = cmd.data.(*time.Ticker)
+ case ActionGetStopChan:
+ cmd.data.(chan chan struct{}) <- w.state.stopChan
+ case ActionGetStartTime:
+ cmd.data.(chan time.Time) <- w.state.startTime
+ case ActionGetCompletedTasks:
+ cmd.data.(chan int) <- w.state.tasksCompleted
+ case ActionGetFailedTasks:
+ cmd.data.(chan int) <- w.state.tasksFailed
+ case ActionCancelTask:
+ taskID := cmd.data.(string)
+ if task, exists := w.state.currentTasks[taskID]; exists {
+ glog.Infof("Cancelling task %s", task.ID)
+ // TODO: Implement actual task cancellation logic
+ } else {
+ glog.Warningf("Cannot cancel task %s: task not found", taskID)
+ }
+
+ }
+ }
+}
+
+func (w *Worker) getTaskLoad() int {
+ respCh := make(chan int, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetTaskLoad,
+ data: respCh,
+ resp: nil,
+ }
+ return <-respCh
+}
+
+func (w *Worker) setTask(task *types.TaskInput) error {
+ resp := make(chan error)
+ w.cmds <- workerCommand{
+ action: ActionSetTask,
+ data: task,
+ resp: resp,
+ }
+ if err := <-resp; err != nil {
+ glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
+ w.id, w.getTaskLoad(), w.config.MaxConcurrent, task.ID)
+ return err
+ }
+ newLoad := w.getTaskLoad()
+
+ glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
+ w.id, task.ID, newLoad, w.config.MaxConcurrent)
+ return nil
+}
+
+func (w *Worker) removeTask(task *types.TaskInput) int {
+ w.cmds <- workerCommand{
+ action: ActionRemoveTask,
+ data: task.ID,
+ }
+ return w.getTaskLoad()
+}
+
+func (w *Worker) getAdmin() AdminClient {
+ respCh := make(chan AdminClient, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetAdmin,
+ data: respCh,
+ }
+ return <-respCh
+}
+
+func (w *Worker) getStopChan() chan struct{} {
+ respCh := make(chan chan struct{}, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetStopChan,
+ data: respCh,
+ }
+ return <-respCh
+}
+
+func (w *Worker) getHbTick() *time.Ticker {
+ respCh := make(chan *time.Ticker, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetHbTick,
+ data: respCh,
+ }
+ return <-respCh
+}
+
+func (w *Worker) getReqTick() *time.Ticker {
+ respCh := make(chan *time.Ticker, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetReqTick,
+ data: respCh,
+ }
+ return <-respCh
+}
+
+func (w *Worker) setHbTick(tick *time.Ticker) *time.Ticker {
+ w.cmds <- workerCommand{
+ action: ActionSetHbTick,
+ data: tick,
+ }
+ return w.getHbTick()
+}
+
+func (w *Worker) setReqTick(tick *time.Ticker) *time.Ticker {
+ w.cmds <- workerCommand{
+ action: ActionSetReqTick,
+ data: tick,
+ }
+ return w.getReqTick()
+}
+
+func (w *Worker) getStartTime() time.Time {
+ respCh := make(chan time.Time, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetStartTime,
+ data: respCh,
+ }
+ return <-respCh
+}
+func (w *Worker) getCompletedTasks() int {
+ respCh := make(chan int, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetCompletedTasks,
+ data: respCh,
+ }
+ return <-respCh
+}
+func (w *Worker) getFailedTasks() int {
+ respCh := make(chan int, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetFailedTasks,
+ data: respCh,
+ }
+ return <-respCh
+}
+
// getTaskLoggerConfig returns the task logger configuration with worker's log directory
func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig {
config := tasks.DefaultTaskLoggerConfig()
@@ -177,21 +418,29 @@ func (w *Worker) ID() string {
return w.id
}
-// Start starts the worker
func (w *Worker) Start() error {
- w.mutex.Lock()
- defer w.mutex.Unlock()
+ resp := make(chan error)
+ w.cmds <- workerCommand{
+ action: ActionStart,
+ resp: resp,
+ }
+ return <-resp
+}
- if w.running {
- return fmt.Errorf("worker is already running")
+// Start starts the worker
+func (w *Worker) handleStart(cmd workerCommand) {
+ if w.state.running {
+ cmd.resp <- fmt.Errorf("worker is already running")
+ return
}
- if w.adminClient == nil {
- return fmt.Errorf("admin client is not set")
+ if w.state.adminClient == nil {
+ cmd.resp <- fmt.Errorf("admin client is not set")
+ return
}
- w.running = true
- w.startTime = time.Now()
+ w.state.running = true
+ w.state.startTime = time.Now()
// Prepare worker info for registration
workerInfo := &types.WorkerData{
@@ -204,7 +453,7 @@ func (w *Worker) Start() error {
}
// Register worker info with client first (this stores it for use during connection)
- if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
+ if err := w.state.adminClient.RegisterWorker(workerInfo); err != nil {
glog.V(1).Infof("Worker info stored for registration: %v", err)
// This is expected if not connected yet
}
@@ -214,7 +463,7 @@ func (w *Worker) Start() error {
w.id, w.config.Capabilities, w.config.MaxConcurrent)
// Try initial connection, but don't fail if it doesn't work immediately
- if err := w.adminClient.Connect(); err != nil {
+ if err := w.state.adminClient.Connect(); err != nil {
glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err)
// Don't return error - let the reconnection loop handle it
} else {
@@ -230,54 +479,67 @@ func (w *Worker) Start() error {
go w.messageProcessingLoop()
glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id)
- return nil
+ cmd.resp <- nil
}
-// Stop stops the worker
func (w *Worker) Stop() error {
- w.mutex.Lock()
- defer w.mutex.Unlock()
-
- if !w.running {
- return nil
- }
-
- w.running = false
- close(w.stopChan)
-
- // Stop tickers
- if w.heartbeatTicker != nil {
- w.heartbeatTicker.Stop()
+ resp := make(chan error)
+ w.cmds <- workerCommand{
+ action: ActionStop,
+ resp: resp,
}
- if w.requestTicker != nil {
- w.requestTicker.Stop()
+ if err := <-resp; err != nil {
+ return err
}
- // Wait for current tasks to complete or timeout
+ // Wait for tasks to finish
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
-
- for len(w.currentTasks) > 0 {
+ for w.getTaskLoad() > 0 {
select {
case <-timeout.C:
- glog.Warningf("Worker %s stopping with %d tasks still running", w.id, len(w.currentTasks))
- break
- case <-time.After(time.Second):
- // Check again
+ glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad())
+ goto end_wait
+ case <-time.After(100 * time.Millisecond):
}
}
+end_wait:
// Disconnect from admin server
- if w.adminClient != nil {
- if err := w.adminClient.Disconnect(); err != nil {
+ if adminClient := w.getAdmin(); adminClient != nil {
+ if err := adminClient.Disconnect(); err != nil {
glog.Errorf("Error disconnecting from admin server: %v", err)
}
}
+ w.closeOnce.Do(func() {
+ close(w.cmds)
+ })
glog.Infof("Worker %s stopped", w.id)
return nil
}
+// Stop stops the worker
+func (w *Worker) handleStop(cmd workerCommand) {
+ if !w.state.running {
+ cmd.resp <- nil
+ return
+ }
+
+ w.state.running = false
+ close(w.state.stopChan)
+
+ // Stop tickers
+ if w.state.heartbeatTicker != nil {
+ w.state.heartbeatTicker.Stop()
+ }
+ if w.state.requestTicker != nil {
+ w.state.requestTicker.Stop()
+ }
+
+ cmd.resp <- nil
+}
+
// RegisterTask registers a task factory
func (w *Worker) RegisterTask(taskType types.TaskType, factory types.TaskFactory) {
w.registry.Register(taskType, factory)
@@ -290,31 +552,13 @@ func (w *Worker) GetCapabilities() []types.TaskType {
// GetStatus returns the current worker status
func (w *Worker) GetStatus() types.WorkerStatus {
- w.mutex.RLock()
- defer w.mutex.RUnlock()
-
- var currentTasks []types.TaskInput
- for _, task := range w.currentTasks {
- currentTasks = append(currentTasks, *task)
- }
-
- status := "active"
- if len(w.currentTasks) >= w.config.MaxConcurrent {
- status = "busy"
- }
-
- return types.WorkerStatus{
- WorkerID: w.id,
- Status: status,
- Capabilities: w.config.Capabilities,
- MaxConcurrent: w.config.MaxConcurrent,
- CurrentLoad: len(w.currentTasks),
- LastHeartbeat: time.Now(),
- CurrentTasks: currentTasks,
- Uptime: time.Since(w.startTime),
- TasksCompleted: w.tasksCompleted,
- TasksFailed: w.tasksFailed,
+ respCh := make(statusResponse, 1)
+ w.cmds <- workerCommand{
+ action: ActionGetStatus,
+ data: respCh,
+ resp: nil,
}
+ return <-respCh
}
// HandleTask handles a task execution
@@ -322,22 +566,10 @@ func (w *Worker) HandleTask(task *types.TaskInput) error {
glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)",
w.id, task.ID, task.Type, task.VolumeID)
- w.mutex.Lock()
- currentLoad := len(w.currentTasks)
- if currentLoad >= w.config.MaxConcurrent {
- w.mutex.Unlock()
- glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
- w.id, currentLoad, w.config.MaxConcurrent, task.ID)
- return fmt.Errorf("worker is at capacity")
+ if err := w.setTask(task); err != nil {
+ return err
}
- w.currentTasks[task.ID] = task
- newLoad := len(w.currentTasks)
- w.mutex.Unlock()
-
- glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
- w.id, task.ID, newLoad, w.config.MaxConcurrent)
-
// Execute task in goroutine
go w.executeTask(task)
@@ -366,7 +598,10 @@ func (w *Worker) SetTaskRequestInterval(interval time.Duration) {
// SetAdminClient sets the admin client
func (w *Worker) SetAdminClient(client AdminClient) {
- w.adminClient = client
+ w.cmds <- workerCommand{
+ action: ActionSetAdmin,
+ data: client,
+ }
}
// executeTask executes a task
@@ -374,10 +609,7 @@ func (w *Worker) executeTask(task *types.TaskInput) {
startTime := time.Now()
defer func() {
- w.mutex.Lock()
- delete(w.currentTasks, task.ID)
- currentLoad := len(w.currentTasks)
- w.mutex.Unlock()
+ currentLoad := w.removeTask(task)
duration := time.Since(startTime)
glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d",
@@ -388,7 +620,7 @@ func (w *Worker) executeTask(task *types.TaskInput) {
w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339))
// Report task start to admin server
- if err := w.adminClient.UpdateTaskProgress(task.ID, 0.0); err != nil {
+ if err := w.getAdmin().UpdateTaskProgress(task.ID, 0.0); err != nil {
glog.V(1).Infof("Failed to report task start to admin: %v", err)
}
@@ -461,7 +693,7 @@ func (w *Worker) executeTask(task *types.TaskInput) {
taskInstance.SetProgressCallback(func(progress float64, stage string) {
// Report progress updates to admin server
glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage)
- if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil {
+ if err := w.getAdmin().UpdateTaskProgress(task.ID, progress); err != nil {
glog.V(1).Infof("Failed to report task progress to admin: %v", err)
}
if fileLogger != nil {
@@ -481,7 +713,9 @@ func (w *Worker) executeTask(task *types.TaskInput) {
// Report completion
if err != nil {
w.completeTask(task.ID, false, err.Error())
- w.tasksFailed++
+ w.cmds <- workerCommand{
+ action: ActionIncTaskFail,
+ }
glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
if fileLogger != nil {
fileLogger.LogStatus("failed", err.Error())
@@ -489,18 +723,21 @@ func (w *Worker) executeTask(task *types.TaskInput) {
}
} else {
w.completeTask(task.ID, true, "")
- w.tasksCompleted++
+ w.cmds <- workerCommand{
+ action: ActionIncTaskComplete,
+ }
glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)
if fileLogger != nil {
fileLogger.Info("Task %s completed successfully", task.ID)
}
}
+ return
}
// completeTask reports task completion to admin server
func (w *Worker) completeTask(taskID string, success bool, errorMsg string) {
- if w.adminClient != nil {
- if err := w.adminClient.CompleteTask(taskID, success, errorMsg); err != nil {
+ if w.getAdmin() != nil {
+ if err := w.getAdmin().CompleteTask(taskID, success, errorMsg); err != nil {
glog.Errorf("Failed to report task completion: %v", err)
}
}
@@ -508,14 +745,14 @@ func (w *Worker) completeTask(taskID string, success bool, errorMsg string) {
// heartbeatLoop sends periodic heartbeats to the admin server
func (w *Worker) heartbeatLoop() {
- w.heartbeatTicker = time.NewTicker(w.config.HeartbeatInterval)
- defer w.heartbeatTicker.Stop()
-
+ defer w.setHbTick(time.NewTicker(w.config.HeartbeatInterval)).Stop()
+ ticker := w.getHbTick()
+ stopChan := w.getStopChan()
for {
select {
- case <-w.stopChan:
+ case <-stopChan:
return
- case <-w.heartbeatTicker.C:
+ case <-ticker.C:
w.sendHeartbeat()
}
}
@@ -523,14 +760,14 @@ func (w *Worker) heartbeatLoop() {
// taskRequestLoop periodically requests new tasks from the admin server
func (w *Worker) taskRequestLoop() {
- w.requestTicker = time.NewTicker(w.config.TaskRequestInterval)
- defer w.requestTicker.Stop()
-
+ defer w.setReqTick(time.NewTicker(w.config.TaskRequestInterval)).Stop()
+ ticker := w.getReqTick()
+ stopChan := w.getStopChan()
for {
select {
- case <-w.stopChan:
+ case <-stopChan:
return
- case <-w.requestTicker.C:
+ case <-ticker.C:
w.requestTasks()
}
}
@@ -538,13 +775,13 @@ func (w *Worker) taskRequestLoop() {
// sendHeartbeat sends heartbeat to admin server
func (w *Worker) sendHeartbeat() {
- if w.adminClient != nil {
- if err := w.adminClient.SendHeartbeat(w.id, &types.WorkerStatus{
+ if w.getAdmin() != nil {
+ if err := w.getAdmin().SendHeartbeat(w.id, &types.WorkerStatus{
WorkerID: w.id,
Status: "active",
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
- CurrentLoad: len(w.currentTasks),
+ CurrentLoad: w.getTaskLoad(),
LastHeartbeat: time.Now(),
}); err != nil {
glog.Warningf("Failed to send heartbeat: %v", err)
@@ -554,9 +791,7 @@ func (w *Worker) sendHeartbeat() {
// requestTasks requests new tasks from the admin server
func (w *Worker) requestTasks() {
- w.mutex.RLock()
- currentLoad := len(w.currentTasks)
- w.mutex.RUnlock()
+ currentLoad := w.getTaskLoad()
if currentLoad >= w.config.MaxConcurrent {
glog.V(3).Infof("TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)",
@@ -564,11 +799,11 @@ func (w *Worker) requestTasks() {
return // Already at capacity
}
- if w.adminClient != nil {
+ if w.getAdmin() != nil {
glog.V(3).Infof("REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)",
w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities)
- task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities)
+ task, err := w.getAdmin().RequestTask(w.id, w.config.Capabilities)
if err != nil {
glog.V(2).Infof("TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err)
return
@@ -591,18 +826,6 @@ func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry {
return w.registry
}
-// GetCurrentTasks returns the current tasks
-func (w *Worker) GetCurrentTasks() map[string]*types.TaskInput {
- w.mutex.RLock()
- defer w.mutex.RUnlock()
-
- tasks := make(map[string]*types.TaskInput)
- for id, task := range w.currentTasks {
- tasks[id] = task
- }
- return tasks
-}
-
// registerWorker registers the worker with the admin server
func (w *Worker) registerWorker() {
workerInfo := &types.WorkerData{
@@ -614,7 +837,7 @@ func (w *Worker) registerWorker() {
LastHeartbeat: time.Now(),
}
- if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
+ if err := w.getAdmin().RegisterWorker(workerInfo); err != nil {
glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err)
} else {
glog.Infof("Worker %s registered successfully with admin server", w.id)
@@ -627,15 +850,15 @@ func (w *Worker) connectionMonitorLoop() {
defer ticker.Stop()
lastConnectionStatus := false
-
+ stopChan := w.getStopChan()
for {
select {
- case <-w.stopChan:
+ case <-stopChan:
glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id)
return
case <-ticker.C:
// Monitor connection status and log changes
- currentConnectionStatus := w.adminClient != nil && w.adminClient.IsConnected()
+ currentConnectionStatus := w.getAdmin() != nil && w.getAdmin().IsConnected()
if currentConnectionStatus != lastConnectionStatus {
if currentConnectionStatus {
@@ -662,19 +885,17 @@ func (w *Worker) GetConfig() *types.WorkerConfig {
// GetPerformanceMetrics returns performance metrics
func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
- w.mutex.RLock()
- defer w.mutex.RUnlock()
- uptime := time.Since(w.startTime)
+ uptime := time.Since(w.getStartTime())
var successRate float64
- totalTasks := w.tasksCompleted + w.tasksFailed
+ totalTasks := w.getCompletedTasks() + w.getFailedTasks()
if totalTasks > 0 {
- successRate = float64(w.tasksCompleted) / float64(totalTasks) * 100
+ successRate = float64(w.getCompletedTasks()) / float64(totalTasks) * 100
}
return &types.WorkerPerformance{
- TasksCompleted: w.tasksCompleted,
- TasksFailed: w.tasksFailed,
+ TasksCompleted: w.getCompletedTasks(),
+ TasksFailed: w.getFailedTasks(),
AverageTaskTime: 0, // Would need to track this
Uptime: uptime,
SuccessRate: successRate,
@@ -686,7 +907,7 @@ func (w *Worker) messageProcessingLoop() {
glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
// Get access to the incoming message channel from gRPC client
- grpcClient, ok := w.adminClient.(*GrpcAdminClient)
+ grpcClient, ok := w.getAdmin().(*GrpcAdminClient)
if !ok {
glog.Warningf("MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id)
return
@@ -694,10 +915,10 @@ func (w *Worker) messageProcessingLoop() {
incomingChan := grpcClient.GetIncomingChannel()
glog.V(1).Infof("MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id)
-
+ stopChan := w.getStopChan()
for {
select {
- case <-w.stopChan:
+ case <-stopChan:
glog.Infof("MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id)
return
case message := <-incomingChan:
@@ -773,7 +994,7 @@ func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) {
},
}
- grpcClient, ok := w.adminClient.(*GrpcAdminClient)
+ grpcClient, ok := w.getAdmin().(*GrpcAdminClient)
if !ok {
glog.Errorf("Cannot send task log response: admin client is not gRPC client")
return
@@ -791,14 +1012,10 @@ func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) {
func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) {
glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId)
- w.mutex.Lock()
- defer w.mutex.Unlock()
-
- if task, exists := w.currentTasks[cancellation.TaskId]; exists {
- // TODO: Implement task cancellation logic
- glog.Infof("Cancelling task %s", task.ID)
- } else {
- glog.Warningf("Cannot cancel task %s: task not found", cancellation.TaskId)
+ w.cmds <- workerCommand{
+ action: ActionCancelTask,
+ data: cancellation.TaskId,
+ resp: nil,
}
}