aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/client.go')
-rw-r--r--weed/worker/client.go761
1 files changed, 761 insertions, 0 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
new file mode 100644
index 000000000..f9b42087c
--- /dev/null
+++ b/weed/worker/client.go
@@ -0,0 +1,761 @@
+package worker
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "google.golang.org/grpc"
+)
+
+// GrpcAdminClient implements AdminClient using gRPC bidirectional streaming
+type GrpcAdminClient struct {
+ adminAddress string
+ workerID string
+ dialOption grpc.DialOption
+
+ conn *grpc.ClientConn
+ client worker_pb.WorkerServiceClient
+ stream worker_pb.WorkerService_WorkerStreamClient
+ streamCtx context.Context
+ streamCancel context.CancelFunc
+
+ connected bool
+ reconnecting bool
+ shouldReconnect bool
+ mutex sync.RWMutex
+
+ // Reconnection parameters
+ maxReconnectAttempts int
+ reconnectBackoff time.Duration
+ maxReconnectBackoff time.Duration
+ reconnectMultiplier float64
+
+ // Worker registration info for re-registration after reconnection
+ lastWorkerInfo *types.Worker
+
+ // Channels for communication
+ outgoing chan *worker_pb.WorkerMessage
+ incoming chan *worker_pb.AdminMessage
+ responseChans map[string]chan *worker_pb.AdminMessage
+ responsesMutex sync.RWMutex
+
+ // Shutdown channel
+ shutdownChan chan struct{}
+}
+
+// NewGrpcAdminClient creates a new gRPC admin client
+func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.DialOption) *GrpcAdminClient {
+ // Admin uses HTTP port + 10000 as gRPC port
+ grpcAddress := pb.ServerToGrpcAddress(adminAddress)
+
+ return &GrpcAdminClient{
+ adminAddress: grpcAddress,
+ workerID: workerID,
+ dialOption: dialOption,
+ shouldReconnect: true,
+ maxReconnectAttempts: 0, // 0 means infinite attempts
+ reconnectBackoff: 1 * time.Second,
+ maxReconnectBackoff: 30 * time.Second,
+ reconnectMultiplier: 1.5,
+ outgoing: make(chan *worker_pb.WorkerMessage, 100),
+ incoming: make(chan *worker_pb.AdminMessage, 100),
+ responseChans: make(map[string]chan *worker_pb.AdminMessage),
+ shutdownChan: make(chan struct{}),
+ }
+}
+
+// Connect establishes gRPC connection to admin server with TLS detection
+func (c *GrpcAdminClient) Connect() error {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+
+ if c.connected {
+ return fmt.Errorf("already connected")
+ }
+
+ // Detect TLS support and create appropriate connection
+ conn, err := c.createConnection()
+ if err != nil {
+ return fmt.Errorf("failed to connect to admin server: %v", err)
+ }
+
+ c.conn = conn
+ c.client = worker_pb.NewWorkerServiceClient(conn)
+
+ // Create bidirectional stream
+ c.streamCtx, c.streamCancel = context.WithCancel(context.Background())
+ stream, err := c.client.WorkerStream(c.streamCtx)
+ if err != nil {
+ c.conn.Close()
+ return fmt.Errorf("failed to create worker stream: %v", err)
+ }
+
+ c.stream = stream
+ c.connected = true
+
+ // Start stream handlers and reconnection loop
+ go c.handleOutgoing()
+ go c.handleIncoming()
+ go c.reconnectionLoop()
+
+ glog.Infof("Connected to admin server at %s", c.adminAddress)
+ return nil
+}
+
+// createConnection attempts to connect using the provided dial option
+func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to admin server: %v", err)
+ }
+
+ glog.Infof("Connected to admin server at %s", c.adminAddress)
+ return conn, nil
+}
+
+// Disconnect closes the gRPC connection
+func (c *GrpcAdminClient) Disconnect() error {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+
+ if !c.connected {
+ return nil
+ }
+
+ c.connected = false
+ c.shouldReconnect = false
+
+ // Send shutdown signal to stop reconnection loop
+ select {
+ case c.shutdownChan <- struct{}{}:
+ default:
+ }
+
+ // Send shutdown message
+ shutdownMsg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_Shutdown{
+ Shutdown: &worker_pb.WorkerShutdown{
+ WorkerId: c.workerID,
+ Reason: "normal shutdown",
+ },
+ },
+ }
+
+ select {
+ case c.outgoing <- shutdownMsg:
+ case <-time.After(time.Second):
+ glog.Warningf("Failed to send shutdown message")
+ }
+
+ // Cancel stream context
+ if c.streamCancel != nil {
+ c.streamCancel()
+ }
+
+ // Close stream
+ if c.stream != nil {
+ c.stream.CloseSend()
+ }
+
+ // Close connection
+ if c.conn != nil {
+ c.conn.Close()
+ }
+
+ // Close channels
+ close(c.outgoing)
+ close(c.incoming)
+
+ glog.Infof("Disconnected from admin server")
+ return nil
+}
+
+// reconnectionLoop handles automatic reconnection with exponential backoff
+func (c *GrpcAdminClient) reconnectionLoop() {
+ backoff := c.reconnectBackoff
+ attempts := 0
+
+ for {
+ select {
+ case <-c.shutdownChan:
+ return
+ default:
+ }
+
+ c.mutex.RLock()
+ shouldReconnect := c.shouldReconnect && !c.connected && !c.reconnecting
+ c.mutex.RUnlock()
+
+ if !shouldReconnect {
+ time.Sleep(time.Second)
+ continue
+ }
+
+ c.mutex.Lock()
+ c.reconnecting = true
+ c.mutex.Unlock()
+
+ glog.Infof("Attempting to reconnect to admin server (attempt %d)", attempts+1)
+
+ // Attempt to reconnect
+ if err := c.reconnect(); err != nil {
+ attempts++
+ glog.Errorf("Reconnection attempt %d failed: %v", attempts, err)
+
+ // Reset reconnecting flag
+ c.mutex.Lock()
+ c.reconnecting = false
+ c.mutex.Unlock()
+
+ // Check if we should give up
+ if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts {
+ glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts)
+ c.mutex.Lock()
+ c.shouldReconnect = false
+ c.mutex.Unlock()
+ return
+ }
+
+ // Wait with exponential backoff
+ glog.Infof("Waiting %v before next reconnection attempt", backoff)
+
+ select {
+ case <-c.shutdownChan:
+ return
+ case <-time.After(backoff):
+ }
+
+ // Increase backoff
+ backoff = time.Duration(float64(backoff) * c.reconnectMultiplier)
+ if backoff > c.maxReconnectBackoff {
+ backoff = c.maxReconnectBackoff
+ }
+ } else {
+ // Successful reconnection
+ attempts = 0
+ backoff = c.reconnectBackoff
+ glog.Infof("Successfully reconnected to admin server")
+
+ c.mutex.Lock()
+ c.reconnecting = false
+ c.mutex.Unlock()
+ }
+ }
+}
+
+// reconnect attempts to re-establish the connection
+func (c *GrpcAdminClient) reconnect() error {
+ // Clean up existing connection completely
+ c.mutex.Lock()
+ if c.streamCancel != nil {
+ c.streamCancel()
+ }
+ if c.stream != nil {
+ c.stream.CloseSend()
+ }
+ if c.conn != nil {
+ c.conn.Close()
+ }
+ c.mutex.Unlock()
+
+ // Create new connection
+ conn, err := c.createConnection()
+ if err != nil {
+ return fmt.Errorf("failed to create connection: %v", err)
+ }
+
+ client := worker_pb.NewWorkerServiceClient(conn)
+
+ // Create new stream
+ streamCtx, streamCancel := context.WithCancel(context.Background())
+ stream, err := client.WorkerStream(streamCtx)
+ if err != nil {
+ conn.Close()
+ streamCancel()
+ return fmt.Errorf("failed to create stream: %v", err)
+ }
+
+ // Update client state
+ c.mutex.Lock()
+ c.conn = conn
+ c.client = client
+ c.stream = stream
+ c.streamCtx = streamCtx
+ c.streamCancel = streamCancel
+ c.connected = true
+ c.mutex.Unlock()
+
+ // Restart stream handlers
+ go c.handleOutgoing()
+ go c.handleIncoming()
+
+ // Re-register worker if we have previous registration info
+ c.mutex.RLock()
+ workerInfo := c.lastWorkerInfo
+ c.mutex.RUnlock()
+
+ if workerInfo != nil {
+ glog.Infof("Re-registering worker after reconnection...")
+ if err := c.sendRegistration(workerInfo); err != nil {
+ glog.Errorf("Failed to re-register worker: %v", err)
+ // Don't fail the reconnection because of registration failure
+ // The registration will be retried on next heartbeat or operation
+ }
+ }
+
+ return nil
+}
+
+// handleOutgoing processes outgoing messages to admin
+func (c *GrpcAdminClient) handleOutgoing() {
+ for msg := range c.outgoing {
+ c.mutex.RLock()
+ connected := c.connected
+ stream := c.stream
+ c.mutex.RUnlock()
+
+ if !connected {
+ break
+ }
+
+ if err := stream.Send(msg); err != nil {
+ glog.Errorf("Failed to send message to admin: %v", err)
+ c.mutex.Lock()
+ c.connected = false
+ c.mutex.Unlock()
+ break
+ }
+ }
+}
+
+// handleIncoming processes incoming messages from admin
+func (c *GrpcAdminClient) handleIncoming() {
+ for {
+ c.mutex.RLock()
+ connected := c.connected
+ stream := c.stream
+ c.mutex.RUnlock()
+
+ if !connected {
+ break
+ }
+
+ msg, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ glog.Infof("Admin server closed the stream")
+ } else {
+ glog.Errorf("Failed to receive message from admin: %v", err)
+ }
+ c.mutex.Lock()
+ c.connected = false
+ c.mutex.Unlock()
+ break
+ }
+
+ // Route message to waiting goroutines or general handler
+ select {
+ case c.incoming <- msg:
+ case <-time.After(time.Second):
+ glog.Warningf("Incoming message buffer full, dropping message")
+ }
+ }
+}
+
+// RegisterWorker registers the worker with the admin server
+func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
+ if !c.connected {
+ return fmt.Errorf("not connected to admin server")
+ }
+
+ // Store worker info for re-registration after reconnection
+ c.mutex.Lock()
+ c.lastWorkerInfo = worker
+ c.mutex.Unlock()
+
+ return c.sendRegistration(worker)
+}
+
+// sendRegistration sends the registration message and waits for response
+func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error {
+ capabilities := make([]string, len(worker.Capabilities))
+ for i, cap := range worker.Capabilities {
+ capabilities[i] = string(cap)
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_Registration{
+ Registration: &worker_pb.WorkerRegistration{
+ WorkerId: c.workerID,
+ Address: worker.Address,
+ Capabilities: capabilities,
+ MaxConcurrent: int32(worker.MaxConcurrent),
+ Metadata: make(map[string]string),
+ },
+ },
+ }
+
+ select {
+ case c.outgoing <- msg:
+ case <-time.After(5 * time.Second):
+ return fmt.Errorf("failed to send registration message: timeout")
+ }
+
+ // Wait for registration response
+ timeout := time.NewTimer(10 * time.Second)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case response := <-c.incoming:
+ if regResp := response.GetRegistrationResponse(); regResp != nil {
+ if regResp.Success {
+ glog.Infof("Worker registered successfully: %s", regResp.Message)
+ return nil
+ }
+ return fmt.Errorf("registration failed: %s", regResp.Message)
+ }
+ case <-timeout.C:
+ return fmt.Errorf("registration timeout")
+ }
+ }
+}
+
+// SendHeartbeat sends heartbeat to admin server
+func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
+ if !c.connected {
+ // Wait for reconnection for a short time
+ if err := c.waitForConnection(10 * time.Second); err != nil {
+ return fmt.Errorf("not connected to admin server: %v", err)
+ }
+ }
+
+ taskIds := make([]string, len(status.CurrentTasks))
+ for i, task := range status.CurrentTasks {
+ taskIds[i] = task.ID
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_Heartbeat{
+ Heartbeat: &worker_pb.WorkerHeartbeat{
+ WorkerId: c.workerID,
+ Status: status.Status,
+ CurrentLoad: int32(status.CurrentLoad),
+ MaxConcurrent: int32(status.MaxConcurrent),
+ CurrentTaskIds: taskIds,
+ TasksCompleted: int32(status.TasksCompleted),
+ TasksFailed: int32(status.TasksFailed),
+ UptimeSeconds: int64(status.Uptime.Seconds()),
+ },
+ },
+ }
+
+ select {
+ case c.outgoing <- msg:
+ return nil
+ case <-time.After(time.Second):
+ return fmt.Errorf("failed to send heartbeat: timeout")
+ }
+}
+
+// RequestTask requests a new task from admin server
+func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
+ if !c.connected {
+ // Wait for reconnection for a short time
+ if err := c.waitForConnection(5 * time.Second); err != nil {
+ return nil, fmt.Errorf("not connected to admin server: %v", err)
+ }
+ }
+
+ caps := make([]string, len(capabilities))
+ for i, cap := range capabilities {
+ caps[i] = string(cap)
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_TaskRequest{
+ TaskRequest: &worker_pb.TaskRequest{
+ WorkerId: c.workerID,
+ Capabilities: caps,
+ AvailableSlots: 1, // Request one task
+ },
+ },
+ }
+
+ select {
+ case c.outgoing <- msg:
+ case <-time.After(time.Second):
+ return nil, fmt.Errorf("failed to send task request: timeout")
+ }
+
+ // Wait for task assignment
+ timeout := time.NewTimer(5 * time.Second)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case response := <-c.incoming:
+ if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
+ // Convert parameters map[string]string to map[string]interface{}
+ parameters := make(map[string]interface{})
+ for k, v := range taskAssign.Params.Parameters {
+ parameters[k] = v
+ }
+
+ // Convert to our task type
+ task := &types.Task{
+ ID: taskAssign.TaskId,
+ Type: types.TaskType(taskAssign.TaskType),
+ Status: types.TaskStatusAssigned,
+ VolumeID: taskAssign.Params.VolumeId,
+ Server: taskAssign.Params.Server,
+ Collection: taskAssign.Params.Collection,
+ Priority: types.TaskPriority(taskAssign.Priority),
+ CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
+ Parameters: parameters,
+ }
+ return task, nil
+ }
+ case <-timeout.C:
+ return nil, nil // No task available
+ }
+ }
+}
+
+// CompleteTask reports task completion to admin server
+func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
+ if !c.connected {
+ // Wait for reconnection for a short time
+ if err := c.waitForConnection(5 * time.Second); err != nil {
+ return fmt.Errorf("not connected to admin server: %v", err)
+ }
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_TaskComplete{
+ TaskComplete: &worker_pb.TaskComplete{
+ TaskId: taskID,
+ WorkerId: c.workerID,
+ Success: success,
+ ErrorMessage: errorMsg,
+ CompletionTime: time.Now().Unix(),
+ },
+ },
+ }
+
+ select {
+ case c.outgoing <- msg:
+ return nil
+ case <-time.After(time.Second):
+ return fmt.Errorf("failed to send task completion: timeout")
+ }
+}
+
+// UpdateTaskProgress updates task progress to admin server
+func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
+ if !c.connected {
+ // Wait for reconnection for a short time
+ if err := c.waitForConnection(5 * time.Second); err != nil {
+ return fmt.Errorf("not connected to admin server: %v", err)
+ }
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_TaskUpdate{
+ TaskUpdate: &worker_pb.TaskUpdate{
+ TaskId: taskID,
+ WorkerId: c.workerID,
+ Status: "in_progress",
+ Progress: float32(progress),
+ },
+ },
+ }
+
+ select {
+ case c.outgoing <- msg:
+ return nil
+ case <-time.After(time.Second):
+ return fmt.Errorf("failed to send task progress: timeout")
+ }
+}
+
+// IsConnected returns whether the client is connected
+func (c *GrpcAdminClient) IsConnected() bool {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ return c.connected
+}
+
+// IsReconnecting returns whether the client is currently attempting to reconnect
+func (c *GrpcAdminClient) IsReconnecting() bool {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ return c.reconnecting
+}
+
+// SetReconnectionSettings allows configuration of reconnection behavior
+func (c *GrpcAdminClient) SetReconnectionSettings(maxAttempts int, initialBackoff, maxBackoff time.Duration, multiplier float64) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.maxReconnectAttempts = maxAttempts
+ c.reconnectBackoff = initialBackoff
+ c.maxReconnectBackoff = maxBackoff
+ c.reconnectMultiplier = multiplier
+}
+
+// StopReconnection stops the reconnection loop
+func (c *GrpcAdminClient) StopReconnection() {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.shouldReconnect = false
+}
+
+// StartReconnection starts the reconnection loop
+func (c *GrpcAdminClient) StartReconnection() {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.shouldReconnect = true
+}
+
+// waitForConnection waits for the connection to be established or timeout
+func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
+ deadline := time.Now().Add(timeout)
+
+ for time.Now().Before(deadline) {
+ c.mutex.RLock()
+ connected := c.connected
+ shouldReconnect := c.shouldReconnect
+ c.mutex.RUnlock()
+
+ if connected {
+ return nil
+ }
+
+ if !shouldReconnect {
+ return fmt.Errorf("reconnection is disabled")
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ return fmt.Errorf("timeout waiting for connection")
+}
+
+// MockAdminClient provides a mock implementation for testing
+type MockAdminClient struct {
+ workerID string
+ connected bool
+ tasks []*types.Task
+ mutex sync.RWMutex
+}
+
+// NewMockAdminClient creates a new mock admin client
+func NewMockAdminClient() *MockAdminClient {
+ return &MockAdminClient{
+ connected: true,
+ tasks: make([]*types.Task, 0),
+ }
+}
+
+// Connect mock implementation
+func (m *MockAdminClient) Connect() error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ m.connected = true
+ return nil
+}
+
+// Disconnect mock implementation
+func (m *MockAdminClient) Disconnect() error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ m.connected = false
+ return nil
+}
+
+// RegisterWorker mock implementation
+func (m *MockAdminClient) RegisterWorker(worker *types.Worker) error {
+ m.workerID = worker.ID
+ glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities)
+ return nil
+}
+
+// SendHeartbeat mock implementation
+func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
+ glog.V(2).Infof("Mock: Heartbeat from worker %s, status: %s, load: %d/%d",
+ workerID, status.Status, status.CurrentLoad, status.MaxConcurrent)
+ return nil
+}
+
+// RequestTask mock implementation
+func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ if len(m.tasks) > 0 {
+ task := m.tasks[0]
+ m.tasks = m.tasks[1:]
+ glog.Infof("Mock: Assigned task %s to worker %s", task.ID, workerID)
+ return task, nil
+ }
+
+ // No tasks available
+ return nil, nil
+}
+
+// CompleteTask mock implementation
+func (m *MockAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
+ if success {
+ glog.Infof("Mock: Task %s completed successfully", taskID)
+ } else {
+ glog.Infof("Mock: Task %s failed: %s", taskID, errorMsg)
+ }
+ return nil
+}
+
+// UpdateTaskProgress mock implementation
+func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
+ glog.V(2).Infof("Mock: Task %s progress: %.1f%%", taskID, progress)
+ return nil
+}
+
+// IsConnected mock implementation
+func (m *MockAdminClient) IsConnected() bool {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+ return m.connected
+}
+
+// AddMockTask adds a mock task for testing
+func (m *MockAdminClient) AddMockTask(task *types.Task) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ m.tasks = append(m.tasks, task)
+}
+
+// CreateAdminClient creates an admin client with the provided dial option
+func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) {
+ return NewGrpcAdminClient(adminServer, workerID, dialOption), nil
+}