diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-30 12:38:03 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-30 12:38:03 -0700 |
| commit | 891a2fb6ebc324329f5330a140b8cacff3899db4 (patch) | |
| tree | d02aaa80a909e958aea831f206b3240b0237d7b7 /weed/worker/tasks/task_logger.go | |
| parent | 64198dad8346fe284cbef944fe01ff0d062c147d (diff) | |
| download | seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.tar.xz seaweedfs-891a2fb6ebc324329f5330a140b8cacff3899db4.zip | |
Admin: misc improvements on admin server and workers. EC now works. (#7055)
* initial design
* added simulation as tests
* reorganized the codebase to move the simulation framework and tests into their own dedicated package
* integration test. ec worker task
* remove "enhanced" reference
* start master, volume servers, filer
Current Status
✅ Master: Healthy and running (port 9333)
✅ Filer: Healthy and running (port 8888)
✅ Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready
* generate write load
* tasks are assigned
* admin start wtih grpc port. worker has its own working directory
* Update .gitignore
* working worker and admin. Task detection is not working yet.
* compiles, detection uses volumeSizeLimitMB from master
* compiles
* worker retries connecting to admin
* build and restart
* rendering pending tasks
* skip task ID column
* sticky worker id
* test canScheduleTaskNow
* worker reconnect to admin
* clean up logs
* worker register itself first
* worker can run ec work and report status
but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.
* move ec task logic
* listing ec shards
* local copy, ec. Need to distribute.
* ec is mostly working now
* distribution of ec shards needs improvement
* need configuration to enable ec
* show ec volumes
* interval field UI component
* rename
* integration test with vauuming
* garbage percentage threshold
* fix warning
* display ec shard sizes
* fix ec volumes list
* Update ui.go
* show default values
* ensure correct default value
* MaintenanceConfig use ConfigField
* use schema defined defaults
* config
* reduce duplication
* refactor to use BaseUIProvider
* each task register its schema
* checkECEncodingCandidate use ecDetector
* use vacuumDetector
* use volumeSizeLimitMB
* remove
remove
* remove unused
* refactor
* use new framework
* remove v2 reference
* refactor
* left menu can scroll now
* The maintenance manager was not being initialized when no data directory was configured for persistent storage.
* saving config
* Update task_config_schema_templ.go
* enable/disable tasks
* protobuf encoded task configurations
* fix system settings
* use ui component
* remove logs
* interface{} Reduction
* reduce interface{}
* reduce interface{}
* avoid from/to map
* reduce interface{}
* refactor
* keep it DRY
* added logging
* debug messages
* debug level
* debug
* show the log caller line
* use configured task policy
* log level
* handle admin heartbeat response
* Update worker.go
* fix EC rack and dc count
* Report task status to admin server
* fix task logging, simplify interface checking, use erasure_coding constants
* factor in empty volume server during task planning
* volume.list adds disk id
* track disk id also
* fix locking scheduled and manual scanning
* add active topology
* simplify task detector
* ec task completed, but shards are not showing up
* implement ec in ec_typed.go
* adjust log level
* dedup
* implementing ec copying shards and only ecx files
* use disk id when distributing ec shards
🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned
* Delete original volume from all locations
* clean up existing shard locations
* local encoding and distributing
* Update docker/admin_integration/EC-TESTING-README.md
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* check volume id range
* simplify
* fix tests
* fix types
* clean up logs and tests
---------
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/worker/tasks/task_logger.go')
| -rw-r--r-- | weed/worker/tasks/task_logger.go | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/weed/worker/tasks/task_logger.go b/weed/worker/tasks/task_logger.go new file mode 100644 index 000000000..e9c06c35c --- /dev/null +++ b/weed/worker/tasks/task_logger.go @@ -0,0 +1,432 @@ +package tasks + +import ( + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TaskLogger provides file-based logging for individual tasks +type TaskLogger interface { + // Log methods + Info(message string, args ...interface{}) + Warning(message string, args ...interface{}) + Error(message string, args ...interface{}) + Debug(message string, args ...interface{}) + + // Progress and status logging + LogProgress(progress float64, message string) + LogStatus(status string, message string) + + // Structured logging + LogWithFields(level string, message string, fields map[string]interface{}) + + // Lifecycle + Close() error + GetLogDir() string +} + +// LoggerProvider interface for tasks that support logging +type LoggerProvider interface { + InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error + GetTaskLogger() TaskLogger +} + +// TaskLoggerConfig holds configuration for task logging +type TaskLoggerConfig struct { + BaseLogDir string + MaxTasks int // Maximum number of task logs to keep + MaxLogSizeMB int // Maximum log file size in MB + EnableConsole bool // Also log to console +} + +// FileTaskLogger implements TaskLogger using file-based logging +type FileTaskLogger struct { + taskID string + taskType types.TaskType + workerID string + logDir string + logFile *os.File + mutex sync.Mutex + config TaskLoggerConfig + metadata *TaskLogMetadata + closed bool +} + +// TaskLogMetadata contains metadata about the task execution +type TaskLogMetadata struct { + TaskID string `json:"task_id"` + TaskType string `json:"task_type"` + WorkerID string `json:"worker_id"` + StartTime time.Time `json:"start_time"` + EndTime *time.Time `json:"end_time,omitempty"` + Duration *time.Duration `json:"duration,omitempty"` + Status string `json:"status"` + Progress float64 `json:"progress"` + VolumeID uint32 `json:"volume_id,omitempty"` + Server string `json:"server,omitempty"` + Collection string `json:"collection,omitempty"` + CustomData map[string]interface{} `json:"custom_data,omitempty"` + LogFilePath string `json:"log_file_path"` + CreatedAt time.Time `json:"created_at"` +} + +// TaskLogEntry represents a single log entry +type TaskLogEntry struct { + Timestamp time.Time `json:"timestamp"` + Level string `json:"level"` + Message string `json:"message"` + Fields map[string]interface{} `json:"fields,omitempty"` + Progress *float64 `json:"progress,omitempty"` + Status *string `json:"status,omitempty"` +} + +// DefaultTaskLoggerConfig returns default configuration +func DefaultTaskLoggerConfig() TaskLoggerConfig { + return TaskLoggerConfig{ + BaseLogDir: "/data/task_logs", // Use persistent data directory + MaxTasks: 100, // Keep last 100 task logs + MaxLogSizeMB: 10, + EnableConsole: true, + } +} + +// NewTaskLogger creates a new file-based task logger +func NewTaskLogger(taskID string, taskType types.TaskType, workerID string, params types.TaskParams, config TaskLoggerConfig) (TaskLogger, error) { + // Create unique directory name with timestamp + timestamp := time.Now().Format("20060102_150405") + dirName := fmt.Sprintf("%s_%s_%s_%s", taskID, taskType, workerID, timestamp) + logDir := filepath.Join(config.BaseLogDir, dirName) + + // Create log directory + if err := os.MkdirAll(logDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create log directory %s: %w", logDir, err) + } + + // Create log file + logFilePath := filepath.Join(logDir, "task.log") + logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, fmt.Errorf("failed to create log file %s: %w", logFilePath, err) + } + + // Create metadata + metadata := &TaskLogMetadata{ + TaskID: taskID, + TaskType: string(taskType), + WorkerID: workerID, + StartTime: time.Now(), + Status: "started", + Progress: 0.0, + VolumeID: params.VolumeID, + Server: params.Server, + Collection: params.Collection, + CustomData: make(map[string]interface{}), + LogFilePath: logFilePath, + CreatedAt: time.Now(), + } + + logger := &FileTaskLogger{ + taskID: taskID, + taskType: taskType, + workerID: workerID, + logDir: logDir, + logFile: logFile, + config: config, + metadata: metadata, + closed: false, + } + + // Write initial log entry + logger.Info("Task logger initialized for %s (type: %s, worker: %s)", taskID, taskType, workerID) + logger.LogWithFields("INFO", "Task parameters", map[string]interface{}{ + "volume_id": params.VolumeID, + "server": params.Server, + "collection": params.Collection, + }) + + // Save initial metadata + if err := logger.saveMetadata(); err != nil { + glog.Warningf("Failed to save initial task metadata: %v", err) + } + + // Clean up old task logs + go logger.cleanupOldLogs() + + return logger, nil +} + +// Info logs an info message +func (l *FileTaskLogger) Info(message string, args ...interface{}) { + l.log("INFO", message, args...) +} + +// Warning logs a warning message +func (l *FileTaskLogger) Warning(message string, args ...interface{}) { + l.log("WARNING", message, args...) +} + +// Error logs an error message +func (l *FileTaskLogger) Error(message string, args ...interface{}) { + l.log("ERROR", message, args...) +} + +// Debug logs a debug message +func (l *FileTaskLogger) Debug(message string, args ...interface{}) { + l.log("DEBUG", message, args...) +} + +// LogProgress logs task progress +func (l *FileTaskLogger) LogProgress(progress float64, message string) { + l.mutex.Lock() + l.metadata.Progress = progress + l.mutex.Unlock() + + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: "INFO", + Message: message, + Progress: &progress, + } + + l.writeLogEntry(entry) + l.saveMetadata() // Update metadata with new progress +} + +// LogStatus logs task status change +func (l *FileTaskLogger) LogStatus(status string, message string) { + l.mutex.Lock() + l.metadata.Status = status + l.mutex.Unlock() + + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: "INFO", + Message: message, + Status: &status, + } + + l.writeLogEntry(entry) + l.saveMetadata() // Update metadata with new status +} + +// LogWithFields logs a message with structured fields +func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[string]interface{}) { + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: level, + Message: message, + Fields: fields, + } + + l.writeLogEntry(entry) +} + +// Close closes the logger and finalizes metadata +func (l *FileTaskLogger) Close() error { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.closed { + return nil + } + + // Finalize metadata + endTime := time.Now() + duration := endTime.Sub(l.metadata.StartTime) + l.metadata.EndTime = &endTime + l.metadata.Duration = &duration + + if l.metadata.Status == "started" { + l.metadata.Status = "completed" + } + + // Save final metadata + l.saveMetadata() + + // Close log file + if l.logFile != nil { + if err := l.logFile.Close(); err != nil { + return fmt.Errorf("failed to close log file: %w", err) + } + } + + l.closed = true + l.Info("Task logger closed for %s", l.taskID) + + return nil +} + +// GetLogDir returns the log directory path +func (l *FileTaskLogger) GetLogDir() string { + return l.logDir +} + +// log is the internal logging method +func (l *FileTaskLogger) log(level string, message string, args ...interface{}) { + formattedMessage := fmt.Sprintf(message, args...) + + entry := TaskLogEntry{ + Timestamp: time.Now(), + Level: level, + Message: formattedMessage, + } + + l.writeLogEntry(entry) +} + +// writeLogEntry writes a log entry to the file +func (l *FileTaskLogger) writeLogEntry(entry TaskLogEntry) { + l.mutex.Lock() + defer l.mutex.Unlock() + + if l.closed || l.logFile == nil { + return + } + + // Format as JSON line + jsonData, err := json.Marshal(entry) + if err != nil { + glog.Errorf("Failed to marshal log entry: %v", err) + return + } + + // Write to file + if _, err := l.logFile.WriteString(string(jsonData) + "\n"); err != nil { + glog.Errorf("Failed to write log entry: %v", err) + return + } + + // Flush to disk + if err := l.logFile.Sync(); err != nil { + glog.Errorf("Failed to sync log file: %v", err) + } + + // Also log to console and stderr if enabled + if l.config.EnableConsole { + // Log to glog with proper call depth to show actual source location + // We need depth 3 to skip: writeLogEntry -> log -> Info/Warning/Error calls to reach the original caller + formattedMsg := fmt.Sprintf("[TASK-%s] %s: %s", l.taskID, entry.Level, entry.Message) + switch entry.Level { + case "ERROR": + glog.ErrorDepth(3, formattedMsg) + case "WARNING": + glog.WarningDepth(3, formattedMsg) + default: // INFO, DEBUG, etc. + glog.InfoDepth(3, formattedMsg) + } + // Also log to stderr for immediate visibility + fmt.Fprintf(os.Stderr, "[TASK-%s] %s: %s\n", l.taskID, entry.Level, entry.Message) + } +} + +// saveMetadata saves task metadata to file +func (l *FileTaskLogger) saveMetadata() error { + metadataPath := filepath.Join(l.logDir, "metadata.json") + + data, err := json.MarshalIndent(l.metadata, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + return os.WriteFile(metadataPath, data, 0644) +} + +// cleanupOldLogs removes old task log directories to maintain the limit +func (l *FileTaskLogger) cleanupOldLogs() { + baseDir := l.config.BaseLogDir + + entries, err := os.ReadDir(baseDir) + if err != nil { + glog.Warningf("Failed to read log directory %s: %v", baseDir, err) + return + } + + // Filter for directories only + var dirs []os.DirEntry + for _, entry := range entries { + if entry.IsDir() { + dirs = append(dirs, entry) + } + } + + // If we're under the limit, nothing to clean + if len(dirs) <= l.config.MaxTasks { + return + } + + // Sort by modification time (oldest first) + sort.Slice(dirs, func(i, j int) bool { + infoI, errI := dirs[i].Info() + infoJ, errJ := dirs[j].Info() + if errI != nil || errJ != nil { + return false + } + return infoI.ModTime().Before(infoJ.ModTime()) + }) + + // Remove oldest directories + numToRemove := len(dirs) - l.config.MaxTasks + for i := 0; i < numToRemove; i++ { + dirPath := filepath.Join(baseDir, dirs[i].Name()) + if err := os.RemoveAll(dirPath); err != nil { + glog.Warningf("Failed to remove old log directory %s: %v", dirPath, err) + } else { + glog.V(1).Infof("Cleaned up old task log directory: %s", dirPath) + } + } + + glog.V(1).Infof("Task log cleanup completed: removed %d old directories", numToRemove) +} + +// GetTaskLogMetadata reads metadata from a task log directory +func GetTaskLogMetadata(logDir string) (*TaskLogMetadata, error) { + metadataPath := filepath.Join(logDir, "metadata.json") + + data, err := os.ReadFile(metadataPath) + if err != nil { + return nil, fmt.Errorf("failed to read metadata file: %w", err) + } + + var metadata TaskLogMetadata + if err := json.Unmarshal(data, &metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) + } + + return &metadata, nil +} + +// ReadTaskLogs reads all log entries from a task log file +func ReadTaskLogs(logDir string) ([]TaskLogEntry, error) { + logPath := filepath.Join(logDir, "task.log") + + file, err := os.Open(logPath) + if err != nil { + return nil, fmt.Errorf("failed to open log file: %w", err) + } + defer file.Close() + + var entries []TaskLogEntry + decoder := json.NewDecoder(file) + + for { + var entry TaskLogEntry + if err := decoder.Decode(&entry); err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("failed to decode log entry: %w", err) + } + entries = append(entries, entry) + } + + return entries, nil +} |
