diff options
Diffstat (limited to 'weed/worker/worker.go')
| -rw-r--r-- | weed/worker/worker.go | 69 |
1 files changed, 34 insertions, 35 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go index ccebbf011..c1ddf8b34 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -209,26 +209,26 @@ func (w *Worker) Start() error { } // Start connection attempt (will register immediately if successful) - glog.Infof("🚀 WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d", + glog.Infof("WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d", w.id, w.config.Capabilities, w.config.MaxConcurrent) // Try initial connection, but don't fail if it doesn't work immediately if err := w.adminClient.Connect(); err != nil { - glog.Warningf("⚠️ INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err) + glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err) // Don't return error - let the reconnection loop handle it } else { - glog.Infof("✅ INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id) + glog.Infof("INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id) } // Start worker loops regardless of initial connection status // They will handle connection failures gracefully - glog.V(1).Infof("🔄 STARTING LOOPS: Worker %s starting background loops", w.id) + glog.V(1).Infof("STARTING LOOPS: Worker %s starting background loops", w.id) go w.heartbeatLoop() go w.taskRequestLoop() go w.connectionMonitorLoop() go w.messageProcessingLoop() - glog.Infof("✅ WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id) + glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id) return nil } @@ -325,7 +325,7 @@ func (w *Worker) HandleTask(task *types.TaskInput) error { currentLoad := len(w.currentTasks) if currentLoad >= w.config.MaxConcurrent { w.mutex.Unlock() - glog.Errorf("❌ TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", + glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s", w.id, currentLoad, w.config.MaxConcurrent, task.ID) return fmt.Errorf("worker is at capacity") } @@ -334,7 +334,7 @@ func (w *Worker) HandleTask(task *types.TaskInput) error { newLoad := len(w.currentTasks) w.mutex.Unlock() - glog.Infof("✅ TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", + glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d", w.id, task.ID, newLoad, w.config.MaxConcurrent) // Execute task in goroutine @@ -379,11 +379,11 @@ func (w *Worker) executeTask(task *types.TaskInput) { w.mutex.Unlock() duration := time.Since(startTime) - glog.Infof("🏁 TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d", + glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d", w.id, task.ID, duration, currentLoad, w.config.MaxConcurrent) }() - glog.Infof("🚀 TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v", + glog.Infof("TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v", w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339)) // Report task start to admin server @@ -570,29 +570,29 @@ func (w *Worker) requestTasks() { w.mutex.RUnlock() if currentLoad >= w.config.MaxConcurrent { - glog.V(3).Infof("🚫 TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)", + glog.V(3).Infof("TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)", w.id, currentLoad, w.config.MaxConcurrent) return // Already at capacity } if w.adminClient != nil { - glog.V(3).Infof("📞 REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)", + glog.V(3).Infof("REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)", w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities) task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities) if err != nil { - glog.V(2).Infof("❌ TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err) + glog.V(2).Infof("TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err) return } if task != nil { - glog.Infof("📨 TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s", + glog.Infof("TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s", w.id, task.ID, task.Type) if err := w.HandleTask(task); err != nil { - glog.Errorf("❌ TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err) + glog.Errorf("TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err) } } else { - glog.V(3).Infof("📭 NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id) + glog.V(3).Infof("NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id) } } } @@ -634,7 +634,6 @@ func (w *Worker) registerWorker() { // connectionMonitorLoop monitors connection status func (w *Worker) connectionMonitorLoop() { - glog.V(1).Infof("🔍 CONNECTION MONITOR STARTED: Worker %s connection monitor loop started", w.id) ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds defer ticker.Stop() @@ -643,7 +642,7 @@ func (w *Worker) connectionMonitorLoop() { for { select { case <-w.stopChan: - glog.V(1).Infof("🛑 CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id) + glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id) return case <-ticker.C: // Monitor connection status and log changes @@ -651,16 +650,16 @@ func (w *Worker) connectionMonitorLoop() { if currentConnectionStatus != lastConnectionStatus { if currentConnectionStatus { - glog.Infof("🔗 CONNECTION RESTORED: Worker %s connection status changed: connected", w.id) + glog.Infof("CONNECTION RESTORED: Worker %s connection status changed: connected", w.id) } else { - glog.Warningf("⚠️ CONNECTION LOST: Worker %s connection status changed: disconnected", w.id) + glog.Warningf("CONNECTION LOST: Worker %s connection status changed: disconnected", w.id) } lastConnectionStatus = currentConnectionStatus } else { if currentConnectionStatus { - glog.V(3).Infof("✅ CONNECTION OK: Worker %s connection status: connected", w.id) + glog.V(3).Infof("CONNECTION OK: Worker %s connection status: connected", w.id) } else { - glog.V(1).Infof("🔌 CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id) + glog.V(1).Infof("CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id) } } } @@ -695,29 +694,29 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { // messageProcessingLoop processes incoming admin messages func (w *Worker) messageProcessingLoop() { - glog.Infof("🔄 MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id) + glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id) // Get access to the incoming message channel from gRPC client grpcClient, ok := w.adminClient.(*GrpcAdminClient) if !ok { - glog.Warningf("⚠️ MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id) + glog.Warningf("MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id) return } incomingChan := grpcClient.GetIncomingChannel() - glog.V(1).Infof("📡 MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id) + glog.V(1).Infof("MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id) for { select { case <-w.stopChan: - glog.Infof("🛑 MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id) + glog.Infof("MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id) return case message := <-incomingChan: if message != nil { - glog.V(3).Infof("📥 MESSAGE PROCESSING: Worker %s processing incoming message", w.id) + glog.V(3).Infof("MESSAGE PROCESSING: Worker %s processing incoming message", w.id) w.processAdminMessage(message) } else { - glog.V(3).Infof("📭 NULL MESSAGE: Worker %s received nil message", w.id) + glog.V(3).Infof("NULL MESSAGE: Worker %s received nil message", w.id) } } } @@ -725,17 +724,17 @@ func (w *Worker) messageProcessingLoop() { // processAdminMessage processes different types of admin messages func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) { - glog.V(4).Infof("📫 ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message) + glog.V(4).Infof("ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message) switch msg := message.Message.(type) { case *worker_pb.AdminMessage_RegistrationResponse: - glog.V(2).Infof("✅ REGISTRATION RESPONSE: Worker %s received registration response", w.id) + glog.V(2).Infof("REGISTRATION RESPONSE: Worker %s received registration response", w.id) w.handleRegistrationResponse(msg.RegistrationResponse) case *worker_pb.AdminMessage_HeartbeatResponse: - glog.V(3).Infof("💓 HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id) + glog.V(3).Infof("HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id) w.handleHeartbeatResponse(msg.HeartbeatResponse) case *worker_pb.AdminMessage_TaskLogRequest: - glog.V(1).Infof("📋 TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId) + glog.V(1).Infof("TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId) w.handleTaskLogRequest(msg.TaskLogRequest) case *worker_pb.AdminMessage_TaskAssignment: taskAssign := msg.TaskAssignment @@ -756,16 +755,16 @@ func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) { } if err := w.HandleTask(task); err != nil { - glog.Errorf("❌ DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err) + glog.Errorf("DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err) } case *worker_pb.AdminMessage_TaskCancellation: - glog.Infof("🛑 TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId) + glog.Infof("TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId) w.handleTaskCancellation(msg.TaskCancellation) case *worker_pb.AdminMessage_AdminShutdown: - glog.Infof("🔄 ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id) + glog.Infof("ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id) w.handleAdminShutdown(msg.AdminShutdown) default: - glog.V(1).Infof("❓ UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message) + glog.V(1).Infof("UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message) } } |
