aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/worker.go')
-rw-r--r--weed/worker/worker.go69
1 files changed, 34 insertions, 35 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go
index ccebbf011..c1ddf8b34 100644
--- a/weed/worker/worker.go
+++ b/weed/worker/worker.go
@@ -209,26 +209,26 @@ func (w *Worker) Start() error {
}
// Start connection attempt (will register immediately if successful)
- glog.Infof("🚀 WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d",
+ glog.Infof("WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d",
w.id, w.config.Capabilities, w.config.MaxConcurrent)
// Try initial connection, but don't fail if it doesn't work immediately
if err := w.adminClient.Connect(); err != nil {
- glog.Warningf("⚠️ INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err)
+ glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err)
// Don't return error - let the reconnection loop handle it
} else {
- glog.Infof("✅ INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id)
+ glog.Infof("INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id)
}
// Start worker loops regardless of initial connection status
// They will handle connection failures gracefully
- glog.V(1).Infof("🔄 STARTING LOOPS: Worker %s starting background loops", w.id)
+ glog.V(1).Infof("STARTING LOOPS: Worker %s starting background loops", w.id)
go w.heartbeatLoop()
go w.taskRequestLoop()
go w.connectionMonitorLoop()
go w.messageProcessingLoop()
- glog.Infof("✅ WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id)
+ glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id)
return nil
}
@@ -325,7 +325,7 @@ func (w *Worker) HandleTask(task *types.TaskInput) error {
currentLoad := len(w.currentTasks)
if currentLoad >= w.config.MaxConcurrent {
w.mutex.Unlock()
- glog.Errorf("❌ TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
+ glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
w.id, currentLoad, w.config.MaxConcurrent, task.ID)
return fmt.Errorf("worker is at capacity")
}
@@ -334,7 +334,7 @@ func (w *Worker) HandleTask(task *types.TaskInput) error {
newLoad := len(w.currentTasks)
w.mutex.Unlock()
- glog.Infof("✅ TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
+ glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
w.id, task.ID, newLoad, w.config.MaxConcurrent)
// Execute task in goroutine
@@ -379,11 +379,11 @@ func (w *Worker) executeTask(task *types.TaskInput) {
w.mutex.Unlock()
duration := time.Since(startTime)
- glog.Infof("🏁 TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d",
+ glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d",
w.id, task.ID, duration, currentLoad, w.config.MaxConcurrent)
}()
- glog.Infof("🚀 TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v",
+ glog.Infof("TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v",
w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339))
// Report task start to admin server
@@ -570,29 +570,29 @@ func (w *Worker) requestTasks() {
w.mutex.RUnlock()
if currentLoad >= w.config.MaxConcurrent {
- glog.V(3).Infof("🚫 TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)",
+ glog.V(3).Infof("TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)",
w.id, currentLoad, w.config.MaxConcurrent)
return // Already at capacity
}
if w.adminClient != nil {
- glog.V(3).Infof("📞 REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)",
+ glog.V(3).Infof("REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)",
w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities)
task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities)
if err != nil {
- glog.V(2).Infof("❌ TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err)
+ glog.V(2).Infof("TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err)
return
}
if task != nil {
- glog.Infof("📨 TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s",
+ glog.Infof("TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s",
w.id, task.ID, task.Type)
if err := w.HandleTask(task); err != nil {
- glog.Errorf("❌ TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err)
+ glog.Errorf("TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err)
}
} else {
- glog.V(3).Infof("📭 NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id)
+ glog.V(3).Infof("NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id)
}
}
}
@@ -634,7 +634,6 @@ func (w *Worker) registerWorker() {
// connectionMonitorLoop monitors connection status
func (w *Worker) connectionMonitorLoop() {
- glog.V(1).Infof("🔍 CONNECTION MONITOR STARTED: Worker %s connection monitor loop started", w.id)
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
defer ticker.Stop()
@@ -643,7 +642,7 @@ func (w *Worker) connectionMonitorLoop() {
for {
select {
case <-w.stopChan:
- glog.V(1).Infof("🛑 CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id)
+ glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id)
return
case <-ticker.C:
// Monitor connection status and log changes
@@ -651,16 +650,16 @@ func (w *Worker) connectionMonitorLoop() {
if currentConnectionStatus != lastConnectionStatus {
if currentConnectionStatus {
- glog.Infof("🔗 CONNECTION RESTORED: Worker %s connection status changed: connected", w.id)
+ glog.Infof("CONNECTION RESTORED: Worker %s connection status changed: connected", w.id)
} else {
- glog.Warningf("⚠️ CONNECTION LOST: Worker %s connection status changed: disconnected", w.id)
+ glog.Warningf("CONNECTION LOST: Worker %s connection status changed: disconnected", w.id)
}
lastConnectionStatus = currentConnectionStatus
} else {
if currentConnectionStatus {
- glog.V(3).Infof("✅ CONNECTION OK: Worker %s connection status: connected", w.id)
+ glog.V(3).Infof("CONNECTION OK: Worker %s connection status: connected", w.id)
} else {
- glog.V(1).Infof("🔌 CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id)
+ glog.V(1).Infof("CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id)
}
}
}
@@ -695,29 +694,29 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
// messageProcessingLoop processes incoming admin messages
func (w *Worker) messageProcessingLoop() {
- glog.Infof("🔄 MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
+ glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
// Get access to the incoming message channel from gRPC client
grpcClient, ok := w.adminClient.(*GrpcAdminClient)
if !ok {
- glog.Warningf("⚠️ MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id)
+ glog.Warningf("MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id)
return
}
incomingChan := grpcClient.GetIncomingChannel()
- glog.V(1).Infof("📡 MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id)
+ glog.V(1).Infof("MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id)
for {
select {
case <-w.stopChan:
- glog.Infof("🛑 MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id)
+ glog.Infof("MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id)
return
case message := <-incomingChan:
if message != nil {
- glog.V(3).Infof("📥 MESSAGE PROCESSING: Worker %s processing incoming message", w.id)
+ glog.V(3).Infof("MESSAGE PROCESSING: Worker %s processing incoming message", w.id)
w.processAdminMessage(message)
} else {
- glog.V(3).Infof("📭 NULL MESSAGE: Worker %s received nil message", w.id)
+ glog.V(3).Infof("NULL MESSAGE: Worker %s received nil message", w.id)
}
}
}
@@ -725,17 +724,17 @@ func (w *Worker) messageProcessingLoop() {
// processAdminMessage processes different types of admin messages
func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
- glog.V(4).Infof("📫 ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message)
+ glog.V(4).Infof("ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message)
switch msg := message.Message.(type) {
case *worker_pb.AdminMessage_RegistrationResponse:
- glog.V(2).Infof("✅ REGISTRATION RESPONSE: Worker %s received registration response", w.id)
+ glog.V(2).Infof("REGISTRATION RESPONSE: Worker %s received registration response", w.id)
w.handleRegistrationResponse(msg.RegistrationResponse)
case *worker_pb.AdminMessage_HeartbeatResponse:
- glog.V(3).Infof("💓 HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id)
+ glog.V(3).Infof("HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id)
w.handleHeartbeatResponse(msg.HeartbeatResponse)
case *worker_pb.AdminMessage_TaskLogRequest:
- glog.V(1).Infof("📋 TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId)
+ glog.V(1).Infof("TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId)
w.handleTaskLogRequest(msg.TaskLogRequest)
case *worker_pb.AdminMessage_TaskAssignment:
taskAssign := msg.TaskAssignment
@@ -756,16 +755,16 @@ func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
}
if err := w.HandleTask(task); err != nil {
- glog.Errorf("❌ DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err)
+ glog.Errorf("DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err)
}
case *worker_pb.AdminMessage_TaskCancellation:
- glog.Infof("🛑 TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId)
+ glog.Infof("TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId)
w.handleTaskCancellation(msg.TaskCancellation)
case *worker_pb.AdminMessage_AdminShutdown:
- glog.Infof("🔄 ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id)
+ glog.Infof("ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id)
w.handleAdminShutdown(msg.AdminShutdown)
default:
- glog.V(1).Infof("❓ UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message)
+ glog.V(1).Infof("UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message)
}
}