diff options
Diffstat (limited to 'weed/worker/worker.go')
| -rw-r--r-- | weed/worker/worker.go | 95 |
1 files changed, 68 insertions, 27 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 2bc0e1e11..49d1ea57f 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "fmt" - "net" "os" "path/filepath" "strings" @@ -78,43 +77,39 @@ func GenerateOrLoadWorkerID(workingDir string) (string, error) { } } - // Generate new unique worker ID with host information + // Generate simplified worker ID hostname, _ := os.Hostname() if hostname == "" { hostname = "unknown" } - // Get local IP address for better host identification - var hostIP string - if addrs, err := net.InterfaceAddrs(); err == nil { - for _, addr := range addrs { - if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To4() != nil { - hostIP = ipnet.IP.String() - break - } + // Use short hostname - take first 6 chars or last part after dots + shortHostname := hostname + if len(hostname) > 6 { + if parts := strings.Split(hostname, "."); len(parts) > 1 { + // Use last part before domain (e.g., "worker1" from "worker1.example.com") + shortHostname = parts[0] + if len(shortHostname) > 6 { + shortHostname = shortHostname[:6] } + } else { + // Use first 6 characters + shortHostname = hostname[:6] } } - if hostIP == "" { - hostIP = "noip" - } - - // Create host identifier combining hostname and IP - hostID := fmt.Sprintf("%s@%s", hostname, hostIP) - // Generate random component for uniqueness - randomBytes := make([]byte, 4) + // Generate random component for uniqueness (2 bytes = 4 hex chars) + randomBytes := make([]byte, 2) var workerID string if _, err := rand.Read(randomBytes); err != nil { - // Fallback to timestamp if crypto/rand fails - workerID = fmt.Sprintf("worker-%s-%d", hostID, time.Now().Unix()) + // Fallback to short timestamp if crypto/rand fails + timestamp := time.Now().Unix() % 10000 // last 4 digits + workerID = fmt.Sprintf("w-%s-%04d", shortHostname, timestamp) glog.Infof("Generated fallback worker ID: %s", workerID) } else { - // Use random bytes + timestamp for uniqueness + // Use random hex for uniqueness randomHex := fmt.Sprintf("%x", randomBytes) - timestamp := time.Now().Unix() - workerID = fmt.Sprintf("worker-%s-%s-%d", hostID, randomHex, timestamp) + workerID = fmt.Sprintf("w-%s-%s", shortHostname, randomHex) glog.Infof("Generated new worker ID: %s", workerID) } @@ -145,6 +140,10 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) { // Initialize task log handler logDir := filepath.Join(config.BaseWorkingDir, "task_logs") + // Ensure the base task log directory exists to avoid errors when admin requests logs + if err := os.MkdirAll(logDir, 0755); err != nil { + glog.Warningf("Failed to create task log base directory %s: %v", logDir, err) + } taskLogHandler := tasks.NewTaskLogHandler(logDir) worker := &Worker{ @@ -407,6 +406,26 @@ func (w *Worker) executeTask(task *types.TaskInput) { // Use new task execution system with unified Task interface glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID) + // Initialize a file-based task logger so admin can retrieve logs + // Build minimal params for logger metadata + loggerParams := types.TaskParams{ + VolumeID: task.VolumeID, + Collection: task.Collection, + TypedParams: task.TypedParams, + } + loggerConfig := w.getTaskLoggerConfig() + fileLogger, logErr := tasks.NewTaskLogger(task.ID, task.Type, w.id, loggerParams, loggerConfig) + if logErr != nil { + glog.Warningf("Failed to initialize file logger for task %s: %v", task.ID, logErr) + } else { + defer func() { + if err := fileLogger.Close(); err != nil { + glog.V(1).Infof("Failed to close task logger for %s: %v", task.ID, err) + } + }() + fileLogger.Info("Task %s started (type=%s, server=%s, collection=%s)", task.ID, task.Type, task.Server, task.Collection) + } + taskFactory := w.registry.Get(task.Type) if taskFactory == nil { w.completeTask(task.ID, false, fmt.Sprintf("task factory not available for %s: task type not found", task.Type)) @@ -431,13 +450,28 @@ func (w *Worker) executeTask(task *types.TaskInput) { // Task execution uses the new unified Task interface glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir) + // If we have a file logger, adapt it so task WithFields logs are captured into file + if fileLogger != nil { + if withLogger, ok := taskInstance.(interface{ SetLogger(types.Logger) }); ok { + withLogger.SetLogger(newTaskLoggerAdapter(fileLogger)) + } + } + // Set progress callback that reports to admin server - taskInstance.SetProgressCallback(func(progress float64) { + taskInstance.SetProgressCallback(func(progress float64, stage string) { // Report progress updates to admin server - glog.V(2).Infof("Task %s progress: %.1f%%", task.ID, progress) + glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage) if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil { glog.V(1).Infof("Failed to report task progress to admin: %v", err) } + if fileLogger != nil { + // Use meaningful stage description or fallback to generic message + message := stage + if message == "" { + message = fmt.Sprintf("Progress: %.1f%%", progress) + } + fileLogger.LogProgress(progress, message) + } }) // Execute task with context @@ -449,10 +483,17 @@ func (w *Worker) executeTask(task *types.TaskInput) { w.completeTask(task.ID, false, err.Error()) w.tasksFailed++ glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err) + if fileLogger != nil { + fileLogger.LogStatus("failed", err.Error()) + fileLogger.Error("Task %s failed: %v", task.ID, err) + } } else { w.completeTask(task.ID, true, "") w.tasksCompleted++ glog.Infof("Worker %s completed task %s successfully", w.id, task.ID) + if fileLogger != nil { + fileLogger.Info("Task %s completed successfully", task.ID) + } } } @@ -696,7 +737,7 @@ func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) { Type: types.TaskType(taskAssign.TaskType), Status: types.TaskStatusAssigned, VolumeID: taskAssign.Params.VolumeId, - Server: taskAssign.Params.Server, + Server: getServerFromParams(taskAssign.Params), Collection: taskAssign.Params.Collection, Priority: types.TaskPriority(taskAssign.Priority), CreatedAt: time.Unix(taskAssign.CreatedTime, 0), |
