aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/client.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-30 12:38:03 -0700
committerGitHub <noreply@github.com>2025-07-30 12:38:03 -0700
commit891a2fb6ebc324329f5330a140b8cacff3899db4 (patch)
treed02aaa80a909e958aea831f206b3240b0237d7b7 /weed/worker/client.go
parent64198dad8346fe284cbef944fe01ff0d062c147d (diff)
downloadseaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.tar.xz
seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.zip
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design * added simulation as tests * reorganized the codebase to move the simulation framework and tests into their own dedicated package * integration test. ec worker task * remove "enhanced" reference * start master, volume servers, filer Current Status ✅ Master: Healthy and running (port 9333) ✅ Filer: Healthy and running (port 8888) ✅ Volume Servers: All 6 servers running (ports 8080-8085) 🔄 Admin/Workers: Will start when dependencies are ready * generate write load * tasks are assigned * admin start wtih grpc port. worker has its own working directory * Update .gitignore * working worker and admin. Task detection is not working yet. * compiles, detection uses volumeSizeLimitMB from master * compiles * worker retries connecting to admin * build and restart * rendering pending tasks * skip task ID column * sticky worker id * test canScheduleTaskNow * worker reconnect to admin * clean up logs * worker register itself first * worker can run ec work and report status but: 1. one volume should not be repeatedly worked on. 2. ec shards needs to be distributed and source data should be deleted. * move ec task logic * listing ec shards * local copy, ec. Need to distribute. * ec is mostly working now * distribution of ec shards needs improvement * need configuration to enable ec * show ec volumes * interval field UI component * rename * integration test with vauuming * garbage percentage threshold * fix warning * display ec shard sizes * fix ec volumes list * Update ui.go * show default values * ensure correct default value * MaintenanceConfig use ConfigField * use schema defined defaults * config * reduce duplication * refactor to use BaseUIProvider * each task register its schema * checkECEncodingCandidate use ecDetector * use vacuumDetector * use volumeSizeLimitMB * remove remove * remove unused * refactor * use new framework * remove v2 reference * refactor * left menu can scroll now * The maintenance manager was not being initialized when no data directory was configured for persistent storage. * saving config * Update task_config_schema_templ.go * enable/disable tasks * protobuf encoded task configurations * fix system settings * use ui component * remove logs * interface{} Reduction * reduce interface{} * reduce interface{} * avoid from/to map * reduce interface{} * refactor * keep it DRY * added logging * debug messages * debug level * debug * show the log caller line * use configured task policy * log level * handle admin heartbeat response * Update worker.go * fix EC rack and dc count * Report task status to admin server * fix task logging, simplify interface checking, use erasure_coding constants * factor in empty volume server during task planning * volume.list adds disk id * track disk id also * fix locking scheduled and manual scanning * add active topology * simplify task detector * ec task completed, but shards are not showing up * implement ec in ec_typed.go * adjust log level * dedup * implementing ec copying shards and only ecx files * use disk id when distributing ec shards 🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk 📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId 🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest 💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId]) 📂 File System: EC shards and metadata land in the exact disk directory planned * Delete original volume from all locations * clean up existing shard locations * local encoding and distributing * Update docker/admin_integration/EC-TESTING-README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * check volume id range * simplify * fix tests * fix types * clean up logs and tests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/worker/client.go')
-rw-r--r--weed/worker/client.go307
1 files changed, 241 insertions, 66 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
index 60b33fb31..53854c6e3 100644
--- a/weed/worker/client.go
+++ b/weed/worker/client.go
@@ -80,6 +80,21 @@ func (c *GrpcAdminClient) Connect() error {
return fmt.Errorf("already connected")
}
+ // Always start the reconnection loop, even if initial connection fails
+ go c.reconnectionLoop()
+
+ // Attempt initial connection
+ err := c.attemptConnection()
+ if err != nil {
+ glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
+ return err
+ }
+
+ return nil
+}
+
+// attemptConnection tries to establish the connection without managing the reconnection loop
+func (c *GrpcAdminClient) attemptConnection() error {
// Detect TLS support and create appropriate connection
conn, err := c.createConnection()
if err != nil {
@@ -100,10 +115,34 @@ func (c *GrpcAdminClient) Connect() error {
c.stream = stream
c.connected = true
- // Start stream handlers and reconnection loop
- go c.handleOutgoing()
- go c.handleIncoming()
- go c.reconnectionLoop()
+ // Always check for worker info and send registration immediately as the very first message
+ c.mutex.RLock()
+ workerInfo := c.lastWorkerInfo
+ c.mutex.RUnlock()
+
+ if workerInfo != nil {
+ // Send registration synchronously as the very first message
+ if err := c.sendRegistrationSync(workerInfo); err != nil {
+ c.conn.Close()
+ c.connected = false
+ return fmt.Errorf("failed to register worker: %w", err)
+ }
+ glog.Infof("Worker registered successfully with admin server")
+ } else {
+ // No worker info yet - stream will wait for registration
+ glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
+ }
+
+ // Start stream handlers with synchronization
+ outgoingReady := make(chan struct{})
+ incomingReady := make(chan struct{})
+
+ go c.handleOutgoingWithReady(outgoingReady)
+ go c.handleIncomingWithReady(incomingReady)
+
+ // Wait for both handlers to be ready
+ <-outgoingReady
+ <-incomingReady
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
@@ -268,53 +307,16 @@ func (c *GrpcAdminClient) reconnect() error {
if c.conn != nil {
c.conn.Close()
}
+ c.connected = false
c.mutex.Unlock()
- // Create new connection
- conn, err := c.createConnection()
+ // Attempt to re-establish connection using the same logic as initial connection
+ err := c.attemptConnection()
if err != nil {
- return fmt.Errorf("failed to create connection: %w", err)
- }
-
- client := worker_pb.NewWorkerServiceClient(conn)
-
- // Create new stream
- streamCtx, streamCancel := context.WithCancel(context.Background())
- stream, err := client.WorkerStream(streamCtx)
- if err != nil {
- conn.Close()
- streamCancel()
- return fmt.Errorf("failed to create stream: %w", err)
- }
-
- // Update client state
- c.mutex.Lock()
- c.conn = conn
- c.client = client
- c.stream = stream
- c.streamCtx = streamCtx
- c.streamCancel = streamCancel
- c.connected = true
- c.mutex.Unlock()
-
- // Restart stream handlers
- go c.handleOutgoing()
- go c.handleIncoming()
-
- // Re-register worker if we have previous registration info
- c.mutex.RLock()
- workerInfo := c.lastWorkerInfo
- c.mutex.RUnlock()
-
- if workerInfo != nil {
- glog.Infof("Re-registering worker after reconnection...")
- if err := c.sendRegistration(workerInfo); err != nil {
- glog.Errorf("Failed to re-register worker: %v", err)
- // Don't fail the reconnection because of registration failure
- // The registration will be retried on next heartbeat or operation
- }
+ return fmt.Errorf("failed to reconnect: %w", err)
}
+ // Registration is now handled in attemptConnection if worker info is available
return nil
}
@@ -340,8 +342,19 @@ func (c *GrpcAdminClient) handleOutgoing() {
}
}
+// handleOutgoingWithReady processes outgoing messages and signals when ready
+func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) {
+ // Signal that this handler is ready to process messages
+ close(ready)
+
+ // Now process messages normally
+ c.handleOutgoing()
+}
+
// handleIncoming processes incoming messages from admin
func (c *GrpcAdminClient) handleIncoming() {
+ glog.V(1).Infof("📡 INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID)
+
for {
c.mutex.RLock()
connected := c.connected
@@ -349,15 +362,17 @@ func (c *GrpcAdminClient) handleIncoming() {
c.mutex.RUnlock()
if !connected {
+ glog.V(1).Infof("🔌 INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID)
break
}
+ glog.V(4).Infof("👂 LISTENING: Worker %s waiting for message from admin server", c.workerID)
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
- glog.Infof("Admin server closed the stream")
+ glog.Infof("🔚 STREAM CLOSED: Worker %s admin server closed the stream", c.workerID)
} else {
- glog.Errorf("Failed to receive message from admin: %v", err)
+ glog.Errorf("❌ RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err)
}
c.mutex.Lock()
c.connected = false
@@ -365,26 +380,42 @@ func (c *GrpcAdminClient) handleIncoming() {
break
}
+ glog.V(4).Infof("📨 MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message)
+
// Route message to waiting goroutines or general handler
select {
case c.incoming <- msg:
+ glog.V(3).Infof("✅ MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID)
case <-time.After(time.Second):
- glog.Warningf("Incoming message buffer full, dropping message")
+ glog.Warningf("🚫 MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message)
}
}
+
+ glog.V(1).Infof("🏁 INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID)
+}
+
+// handleIncomingWithReady processes incoming messages and signals when ready
+func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
+ // Signal that this handler is ready to process messages
+ close(ready)
+
+ // Now process messages normally
+ c.handleIncoming()
}
// RegisterWorker registers the worker with the admin server
func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
- if !c.connected {
- return fmt.Errorf("not connected to admin server")
- }
-
// Store worker info for re-registration after reconnection
c.mutex.Lock()
c.lastWorkerInfo = worker
c.mutex.Unlock()
+ // If not connected, registration will happen when connection is established
+ if !c.connected {
+ glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
+ return nil
+ }
+
return c.sendRegistration(worker)
}
@@ -435,9 +466,88 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error {
}
}
+// sendRegistrationSync sends the registration message synchronously
+func (c *GrpcAdminClient) sendRegistrationSync(worker *types.Worker) error {
+ capabilities := make([]string, len(worker.Capabilities))
+ for i, cap := range worker.Capabilities {
+ capabilities[i] = string(cap)
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_Registration{
+ Registration: &worker_pb.WorkerRegistration{
+ WorkerId: c.workerID,
+ Address: worker.Address,
+ Capabilities: capabilities,
+ MaxConcurrent: int32(worker.MaxConcurrent),
+ Metadata: make(map[string]string),
+ },
+ },
+ }
+
+ // Send directly to stream to ensure it's the first message
+ if err := c.stream.Send(msg); err != nil {
+ return fmt.Errorf("failed to send registration message: %w", err)
+ }
+
+ // Create a channel to receive the response
+ responseChan := make(chan *worker_pb.AdminMessage, 1)
+ errChan := make(chan error, 1)
+
+ // Start a goroutine to listen for the response
+ go func() {
+ for {
+ response, err := c.stream.Recv()
+ if err != nil {
+ errChan <- fmt.Errorf("failed to receive registration response: %w", err)
+ return
+ }
+
+ if regResp := response.GetRegistrationResponse(); regResp != nil {
+ responseChan <- response
+ return
+ }
+ // Continue waiting if it's not a registration response
+ }
+ }()
+
+ // Wait for registration response with timeout
+ timeout := time.NewTimer(10 * time.Second)
+ defer timeout.Stop()
+
+ select {
+ case response := <-responseChan:
+ if regResp := response.GetRegistrationResponse(); regResp != nil {
+ if regResp.Success {
+ glog.V(1).Infof("Worker registered successfully: %s", regResp.Message)
+ return nil
+ }
+ return fmt.Errorf("registration failed: %s", regResp.Message)
+ }
+ return fmt.Errorf("unexpected response type")
+ case err := <-errChan:
+ return err
+ case <-timeout.C:
+ return fmt.Errorf("registration timeout")
+ }
+}
+
// SendHeartbeat sends heartbeat to admin server
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the heartbeat
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping heartbeat during reconnection")
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(10 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
@@ -477,6 +587,17 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta
// RequestTask requests a new task from admin server
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just return no task
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("🔄 RECONNECTING: Worker %s skipping task request during reconnection", workerID)
+ return nil, nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return nil, fmt.Errorf("not connected to admin server: %w", err)
@@ -488,6 +609,9 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
caps[i] = string(cap)
}
+ glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v",
+ workerID, capabilities)
+
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
@@ -502,23 +626,24 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
select {
case c.outgoing <- msg:
+ glog.V(3).Infof("✅ TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID)
case <-time.After(time.Second):
+ glog.Errorf("❌ TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID)
return nil, fmt.Errorf("failed to send task request: timeout")
}
// Wait for task assignment
+ glog.V(3).Infof("⏳ WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID)
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for {
select {
case response := <-c.incoming:
+ glog.V(3).Infof("📨 RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message)
if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
- // Convert parameters map[string]string to map[string]interface{}
- parameters := make(map[string]interface{})
- for k, v := range taskAssign.Params.Parameters {
- parameters[k] = v
- }
+ glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)",
+ workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
// Convert to our task type
task := &types.Task{
@@ -530,11 +655,15 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
Collection: taskAssign.Params.Collection,
Priority: types.TaskPriority(taskAssign.Priority),
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
- Parameters: parameters,
+ // Use typed protobuf parameters directly
+ TypedParams: taskAssign.Params,
}
return task, nil
+ } else {
+ glog.V(3).Infof("📭 NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message)
}
case <-timeout.C:
+ glog.V(3).Infof("⏰ TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID)
return nil, nil // No task available
}
}
@@ -542,24 +671,47 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
// CompleteTask reports task completion to admin server
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
+ return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil)
+}
+
+// CompleteTaskWithMetadata reports task completion with additional metadata
+func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the completion report
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID)
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
}
}
+ taskComplete := &worker_pb.TaskComplete{
+ TaskId: taskID,
+ WorkerId: c.workerID,
+ Success: success,
+ ErrorMessage: errorMsg,
+ CompletionTime: time.Now().Unix(),
+ }
+
+ // Add metadata if provided
+ if metadata != nil {
+ taskComplete.ResultMetadata = metadata
+ }
+
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskComplete{
- TaskComplete: &worker_pb.TaskComplete{
- TaskId: taskID,
- WorkerId: c.workerID,
- Success: success,
- ErrorMessage: errorMsg,
- CompletionTime: time.Now().Unix(),
- },
+ TaskComplete: taskComplete,
},
}
@@ -574,6 +726,17 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str
// UpdateTaskProgress updates task progress to admin server
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the progress update
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID)
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
@@ -663,6 +826,12 @@ func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
return fmt.Errorf("timeout waiting for connection")
}
+// GetIncomingChannel returns the incoming message channel for message processing
+// This allows the worker to process admin messages directly
+func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
+ return c.incoming
+}
+
// MockAdminClient provides a mock implementation for testing
type MockAdminClient struct {
workerID string
@@ -741,6 +910,12 @@ func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) er
return nil
}
+// CompleteTaskWithMetadata mock implementation
+func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
+ glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata)
+ return nil
+}
+
// IsConnected mock implementation
func (m *MockAdminClient) IsConnected() bool {
m.mutex.RLock()