aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/worker.go')
-rw-r--r--weed/worker/worker.go95
1 files changed, 68 insertions, 27 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go
index 2bc0e1e11..49d1ea57f 100644
--- a/weed/worker/worker.go
+++ b/weed/worker/worker.go
@@ -4,7 +4,6 @@ import (
"context"
"crypto/rand"
"fmt"
- "net"
"os"
"path/filepath"
"strings"
@@ -78,43 +77,39 @@ func GenerateOrLoadWorkerID(workingDir string) (string, error) {
}
}
- // Generate new unique worker ID with host information
+ // Generate simplified worker ID
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
- // Get local IP address for better host identification
- var hostIP string
- if addrs, err := net.InterfaceAddrs(); err == nil {
- for _, addr := range addrs {
- if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
- if ipnet.IP.To4() != nil {
- hostIP = ipnet.IP.String()
- break
- }
+ // Use short hostname - take first 6 chars or last part after dots
+ shortHostname := hostname
+ if len(hostname) > 6 {
+ if parts := strings.Split(hostname, "."); len(parts) > 1 {
+ // Use last part before domain (e.g., "worker1" from "worker1.example.com")
+ shortHostname = parts[0]
+ if len(shortHostname) > 6 {
+ shortHostname = shortHostname[:6]
}
+ } else {
+ // Use first 6 characters
+ shortHostname = hostname[:6]
}
}
- if hostIP == "" {
- hostIP = "noip"
- }
-
- // Create host identifier combining hostname and IP
- hostID := fmt.Sprintf("%s@%s", hostname, hostIP)
- // Generate random component for uniqueness
- randomBytes := make([]byte, 4)
+ // Generate random component for uniqueness (2 bytes = 4 hex chars)
+ randomBytes := make([]byte, 2)
var workerID string
if _, err := rand.Read(randomBytes); err != nil {
- // Fallback to timestamp if crypto/rand fails
- workerID = fmt.Sprintf("worker-%s-%d", hostID, time.Now().Unix())
+ // Fallback to short timestamp if crypto/rand fails
+ timestamp := time.Now().Unix() % 10000 // last 4 digits
+ workerID = fmt.Sprintf("w-%s-%04d", shortHostname, timestamp)
glog.Infof("Generated fallback worker ID: %s", workerID)
} else {
- // Use random bytes + timestamp for uniqueness
+ // Use random hex for uniqueness
randomHex := fmt.Sprintf("%x", randomBytes)
- timestamp := time.Now().Unix()
- workerID = fmt.Sprintf("worker-%s-%s-%d", hostID, randomHex, timestamp)
+ workerID = fmt.Sprintf("w-%s-%s", shortHostname, randomHex)
glog.Infof("Generated new worker ID: %s", workerID)
}
@@ -145,6 +140,10 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) {
// Initialize task log handler
logDir := filepath.Join(config.BaseWorkingDir, "task_logs")
+ // Ensure the base task log directory exists to avoid errors when admin requests logs
+ if err := os.MkdirAll(logDir, 0755); err != nil {
+ glog.Warningf("Failed to create task log base directory %s: %v", logDir, err)
+ }
taskLogHandler := tasks.NewTaskLogHandler(logDir)
worker := &Worker{
@@ -407,6 +406,26 @@ func (w *Worker) executeTask(task *types.TaskInput) {
// Use new task execution system with unified Task interface
glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID)
+ // Initialize a file-based task logger so admin can retrieve logs
+ // Build minimal params for logger metadata
+ loggerParams := types.TaskParams{
+ VolumeID: task.VolumeID,
+ Collection: task.Collection,
+ TypedParams: task.TypedParams,
+ }
+ loggerConfig := w.getTaskLoggerConfig()
+ fileLogger, logErr := tasks.NewTaskLogger(task.ID, task.Type, w.id, loggerParams, loggerConfig)
+ if logErr != nil {
+ glog.Warningf("Failed to initialize file logger for task %s: %v", task.ID, logErr)
+ } else {
+ defer func() {
+ if err := fileLogger.Close(); err != nil {
+ glog.V(1).Infof("Failed to close task logger for %s: %v", task.ID, err)
+ }
+ }()
+ fileLogger.Info("Task %s started (type=%s, server=%s, collection=%s)", task.ID, task.Type, task.Server, task.Collection)
+ }
+
taskFactory := w.registry.Get(task.Type)
if taskFactory == nil {
w.completeTask(task.ID, false, fmt.Sprintf("task factory not available for %s: task type not found", task.Type))
@@ -431,13 +450,28 @@ func (w *Worker) executeTask(task *types.TaskInput) {
// Task execution uses the new unified Task interface
glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir)
+ // If we have a file logger, adapt it so task WithFields logs are captured into file
+ if fileLogger != nil {
+ if withLogger, ok := taskInstance.(interface{ SetLogger(types.Logger) }); ok {
+ withLogger.SetLogger(newTaskLoggerAdapter(fileLogger))
+ }
+ }
+
// Set progress callback that reports to admin server
- taskInstance.SetProgressCallback(func(progress float64) {
+ taskInstance.SetProgressCallback(func(progress float64, stage string) {
// Report progress updates to admin server
- glog.V(2).Infof("Task %s progress: %.1f%%", task.ID, progress)
+ glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage)
if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil {
glog.V(1).Infof("Failed to report task progress to admin: %v", err)
}
+ if fileLogger != nil {
+ // Use meaningful stage description or fallback to generic message
+ message := stage
+ if message == "" {
+ message = fmt.Sprintf("Progress: %.1f%%", progress)
+ }
+ fileLogger.LogProgress(progress, message)
+ }
})
// Execute task with context
@@ -449,10 +483,17 @@ func (w *Worker) executeTask(task *types.TaskInput) {
w.completeTask(task.ID, false, err.Error())
w.tasksFailed++
glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
+ if fileLogger != nil {
+ fileLogger.LogStatus("failed", err.Error())
+ fileLogger.Error("Task %s failed: %v", task.ID, err)
+ }
} else {
w.completeTask(task.ID, true, "")
w.tasksCompleted++
glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)
+ if fileLogger != nil {
+ fileLogger.Info("Task %s completed successfully", task.ID)
+ }
}
}
@@ -696,7 +737,7 @@ func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
Type: types.TaskType(taskAssign.TaskType),
Status: types.TaskStatusAssigned,
VolumeID: taskAssign.Params.VolumeId,
- Server: taskAssign.Params.Server,
+ Server: getServerFromParams(taskAssign.Params),
Collection: taskAssign.Params.Collection,
Priority: types.TaskPriority(taskAssign.Priority),
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),