aboutsummaryrefslogtreecommitdiff
path: root/weed/worker
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker')
-rw-r--r--weed/worker/client.go307
-rw-r--r--weed/worker/client_test.go111
-rw-r--r--weed/worker/client_tls_test.go146
-rw-r--r--weed/worker/tasks/balance/balance.go65
-rw-r--r--weed/worker/tasks/balance/balance_detector.go171
-rw-r--r--weed/worker/tasks/balance/balance_register.go109
-rw-r--r--weed/worker/tasks/balance/balance_scheduler.go197
-rw-r--r--weed/worker/tasks/balance/balance_typed.go156
-rw-r--r--weed/worker/tasks/balance/config.go170
-rw-r--r--weed/worker/tasks/balance/detection.go134
-rw-r--r--weed/worker/tasks/balance/ui.go361
-rw-r--r--weed/worker/tasks/base/generic_components.go129
-rw-r--r--weed/worker/tasks/base/registration.go155
-rw-r--r--weed/worker/tasks/base/task_definition.go272
-rw-r--r--weed/worker/tasks/base/task_definition_test.go338
-rw-r--r--weed/worker/tasks/base/typed_task.go218
-rw-r--r--weed/worker/tasks/config_update_registry.go67
-rw-r--r--weed/worker/tasks/erasure_coding/config.go207
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go140
-rw-r--r--weed/worker/tasks/erasure_coding/ec.go792
-rw-r--r--weed/worker/tasks/erasure_coding/ec_detector.go139
-rw-r--r--weed/worker/tasks/erasure_coding/ec_register.go109
-rw-r--r--weed/worker/tasks/erasure_coding/ec_scheduler.go114
-rw-r--r--weed/worker/tasks/erasure_coding/ui.go309
-rw-r--r--weed/worker/tasks/schema_provider.go51
-rw-r--r--weed/worker/tasks/task.go198
-rw-r--r--weed/worker/tasks/task_log_handler.go230
-rw-r--r--weed/worker/tasks/task_logger.go432
-rw-r--r--weed/worker/tasks/ui_base.go184
-rw-r--r--weed/worker/tasks/vacuum/config.go190
-rw-r--r--weed/worker/tasks/vacuum/detection.go112
-rw-r--r--weed/worker/tasks/vacuum/ui.go314
-rw-r--r--weed/worker/tasks/vacuum/vacuum.go195
-rw-r--r--weed/worker/tasks/vacuum/vacuum_detector.go132
-rw-r--r--weed/worker/tasks/vacuum/vacuum_register.go109
-rw-r--r--weed/worker/tasks/vacuum/vacuum_scheduler.go111
-rw-r--r--weed/worker/types/config_types.go4
-rw-r--r--weed/worker/types/data_types.go2
-rw-r--r--weed/worker/types/task_types.go74
-rw-r--r--weed/worker/types/task_ui.go264
-rw-r--r--weed/worker/types/typed_task_interface.go121
-rw-r--r--weed/worker/worker.go466
42 files changed, 5404 insertions, 2701 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
index 60b33fb31..53854c6e3 100644
--- a/weed/worker/client.go
+++ b/weed/worker/client.go
@@ -80,6 +80,21 @@ func (c *GrpcAdminClient) Connect() error {
return fmt.Errorf("already connected")
}
+ // Always start the reconnection loop, even if initial connection fails
+ go c.reconnectionLoop()
+
+ // Attempt initial connection
+ err := c.attemptConnection()
+ if err != nil {
+ glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
+ return err
+ }
+
+ return nil
+}
+
+// attemptConnection tries to establish the connection without managing the reconnection loop
+func (c *GrpcAdminClient) attemptConnection() error {
// Detect TLS support and create appropriate connection
conn, err := c.createConnection()
if err != nil {
@@ -100,10 +115,34 @@ func (c *GrpcAdminClient) Connect() error {
c.stream = stream
c.connected = true
- // Start stream handlers and reconnection loop
- go c.handleOutgoing()
- go c.handleIncoming()
- go c.reconnectionLoop()
+ // Always check for worker info and send registration immediately as the very first message
+ c.mutex.RLock()
+ workerInfo := c.lastWorkerInfo
+ c.mutex.RUnlock()
+
+ if workerInfo != nil {
+ // Send registration synchronously as the very first message
+ if err := c.sendRegistrationSync(workerInfo); err != nil {
+ c.conn.Close()
+ c.connected = false
+ return fmt.Errorf("failed to register worker: %w", err)
+ }
+ glog.Infof("Worker registered successfully with admin server")
+ } else {
+ // No worker info yet - stream will wait for registration
+ glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
+ }
+
+ // Start stream handlers with synchronization
+ outgoingReady := make(chan struct{})
+ incomingReady := make(chan struct{})
+
+ go c.handleOutgoingWithReady(outgoingReady)
+ go c.handleIncomingWithReady(incomingReady)
+
+ // Wait for both handlers to be ready
+ <-outgoingReady
+ <-incomingReady
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
@@ -268,53 +307,16 @@ func (c *GrpcAdminClient) reconnect() error {
if c.conn != nil {
c.conn.Close()
}
+ c.connected = false
c.mutex.Unlock()
- // Create new connection
- conn, err := c.createConnection()
+ // Attempt to re-establish connection using the same logic as initial connection
+ err := c.attemptConnection()
if err != nil {
- return fmt.Errorf("failed to create connection: %w", err)
- }
-
- client := worker_pb.NewWorkerServiceClient(conn)
-
- // Create new stream
- streamCtx, streamCancel := context.WithCancel(context.Background())
- stream, err := client.WorkerStream(streamCtx)
- if err != nil {
- conn.Close()
- streamCancel()
- return fmt.Errorf("failed to create stream: %w", err)
- }
-
- // Update client state
- c.mutex.Lock()
- c.conn = conn
- c.client = client
- c.stream = stream
- c.streamCtx = streamCtx
- c.streamCancel = streamCancel
- c.connected = true
- c.mutex.Unlock()
-
- // Restart stream handlers
- go c.handleOutgoing()
- go c.handleIncoming()
-
- // Re-register worker if we have previous registration info
- c.mutex.RLock()
- workerInfo := c.lastWorkerInfo
- c.mutex.RUnlock()
-
- if workerInfo != nil {
- glog.Infof("Re-registering worker after reconnection...")
- if err := c.sendRegistration(workerInfo); err != nil {
- glog.Errorf("Failed to re-register worker: %v", err)
- // Don't fail the reconnection because of registration failure
- // The registration will be retried on next heartbeat or operation
- }
+ return fmt.Errorf("failed to reconnect: %w", err)
}
+ // Registration is now handled in attemptConnection if worker info is available
return nil
}
@@ -340,8 +342,19 @@ func (c *GrpcAdminClient) handleOutgoing() {
}
}
+// handleOutgoingWithReady processes outgoing messages and signals when ready
+func (c *GrpcAdminClient) handleOutgoingWithReady(ready chan struct{}) {
+ // Signal that this handler is ready to process messages
+ close(ready)
+
+ // Now process messages normally
+ c.handleOutgoing()
+}
+
// handleIncoming processes incoming messages from admin
func (c *GrpcAdminClient) handleIncoming() {
+ glog.V(1).Infof("📡 INCOMING HANDLER STARTED: Worker %s incoming message handler started", c.workerID)
+
for {
c.mutex.RLock()
connected := c.connected
@@ -349,15 +362,17 @@ func (c *GrpcAdminClient) handleIncoming() {
c.mutex.RUnlock()
if !connected {
+ glog.V(1).Infof("🔌 INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - not connected", c.workerID)
break
}
+ glog.V(4).Infof("👂 LISTENING: Worker %s waiting for message from admin server", c.workerID)
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
- glog.Infof("Admin server closed the stream")
+ glog.Infof("🔚 STREAM CLOSED: Worker %s admin server closed the stream", c.workerID)
} else {
- glog.Errorf("Failed to receive message from admin: %v", err)
+ glog.Errorf("❌ RECEIVE ERROR: Worker %s failed to receive message from admin: %v", c.workerID, err)
}
c.mutex.Lock()
c.connected = false
@@ -365,26 +380,42 @@ func (c *GrpcAdminClient) handleIncoming() {
break
}
+ glog.V(4).Infof("📨 MESSAGE RECEIVED: Worker %s received message from admin server: %T", c.workerID, msg.Message)
+
// Route message to waiting goroutines or general handler
select {
case c.incoming <- msg:
+ glog.V(3).Infof("✅ MESSAGE ROUTED: Worker %s successfully routed message to handler", c.workerID)
case <-time.After(time.Second):
- glog.Warningf("Incoming message buffer full, dropping message")
+ glog.Warningf("🚫 MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", c.workerID, msg.Message)
}
}
+
+ glog.V(1).Infof("🏁 INCOMING HANDLER FINISHED: Worker %s incoming message handler finished", c.workerID)
+}
+
+// handleIncomingWithReady processes incoming messages and signals when ready
+func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
+ // Signal that this handler is ready to process messages
+ close(ready)
+
+ // Now process messages normally
+ c.handleIncoming()
}
// RegisterWorker registers the worker with the admin server
func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
- if !c.connected {
- return fmt.Errorf("not connected to admin server")
- }
-
// Store worker info for re-registration after reconnection
c.mutex.Lock()
c.lastWorkerInfo = worker
c.mutex.Unlock()
+ // If not connected, registration will happen when connection is established
+ if !c.connected {
+ glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
+ return nil
+ }
+
return c.sendRegistration(worker)
}
@@ -435,9 +466,88 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error {
}
}
+// sendRegistrationSync sends the registration message synchronously
+func (c *GrpcAdminClient) sendRegistrationSync(worker *types.Worker) error {
+ capabilities := make([]string, len(worker.Capabilities))
+ for i, cap := range worker.Capabilities {
+ capabilities[i] = string(cap)
+ }
+
+ msg := &worker_pb.WorkerMessage{
+ WorkerId: c.workerID,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_Registration{
+ Registration: &worker_pb.WorkerRegistration{
+ WorkerId: c.workerID,
+ Address: worker.Address,
+ Capabilities: capabilities,
+ MaxConcurrent: int32(worker.MaxConcurrent),
+ Metadata: make(map[string]string),
+ },
+ },
+ }
+
+ // Send directly to stream to ensure it's the first message
+ if err := c.stream.Send(msg); err != nil {
+ return fmt.Errorf("failed to send registration message: %w", err)
+ }
+
+ // Create a channel to receive the response
+ responseChan := make(chan *worker_pb.AdminMessage, 1)
+ errChan := make(chan error, 1)
+
+ // Start a goroutine to listen for the response
+ go func() {
+ for {
+ response, err := c.stream.Recv()
+ if err != nil {
+ errChan <- fmt.Errorf("failed to receive registration response: %w", err)
+ return
+ }
+
+ if regResp := response.GetRegistrationResponse(); regResp != nil {
+ responseChan <- response
+ return
+ }
+ // Continue waiting if it's not a registration response
+ }
+ }()
+
+ // Wait for registration response with timeout
+ timeout := time.NewTimer(10 * time.Second)
+ defer timeout.Stop()
+
+ select {
+ case response := <-responseChan:
+ if regResp := response.GetRegistrationResponse(); regResp != nil {
+ if regResp.Success {
+ glog.V(1).Infof("Worker registered successfully: %s", regResp.Message)
+ return nil
+ }
+ return fmt.Errorf("registration failed: %s", regResp.Message)
+ }
+ return fmt.Errorf("unexpected response type")
+ case err := <-errChan:
+ return err
+ case <-timeout.C:
+ return fmt.Errorf("registration timeout")
+ }
+}
+
// SendHeartbeat sends heartbeat to admin server
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the heartbeat
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping heartbeat during reconnection")
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(10 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
@@ -477,6 +587,17 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta
// RequestTask requests a new task from admin server
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just return no task
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("🔄 RECONNECTING: Worker %s skipping task request during reconnection", workerID)
+ return nil, nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return nil, fmt.Errorf("not connected to admin server: %w", err)
@@ -488,6 +609,9 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
caps[i] = string(cap)
}
+ glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v",
+ workerID, capabilities)
+
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
@@ -502,23 +626,24 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
select {
case c.outgoing <- msg:
+ glog.V(3).Infof("✅ TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID)
case <-time.After(time.Second):
+ glog.Errorf("❌ TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID)
return nil, fmt.Errorf("failed to send task request: timeout")
}
// Wait for task assignment
+ glog.V(3).Infof("⏳ WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID)
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for {
select {
case response := <-c.incoming:
+ glog.V(3).Infof("📨 RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message)
if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
- // Convert parameters map[string]string to map[string]interface{}
- parameters := make(map[string]interface{})
- for k, v := range taskAssign.Params.Parameters {
- parameters[k] = v
- }
+ glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)",
+ workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
// Convert to our task type
task := &types.Task{
@@ -530,11 +655,15 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
Collection: taskAssign.Params.Collection,
Priority: types.TaskPriority(taskAssign.Priority),
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
- Parameters: parameters,
+ // Use typed protobuf parameters directly
+ TypedParams: taskAssign.Params,
}
return task, nil
+ } else {
+ glog.V(3).Infof("📭 NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message)
}
case <-timeout.C:
+ glog.V(3).Infof("⏰ TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID)
return nil, nil // No task available
}
}
@@ -542,24 +671,47 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
// CompleteTask reports task completion to admin server
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
+ return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil)
+}
+
+// CompleteTaskWithMetadata reports task completion with additional metadata
+func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the completion report
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID)
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
}
}
+ taskComplete := &worker_pb.TaskComplete{
+ TaskId: taskID,
+ WorkerId: c.workerID,
+ Success: success,
+ ErrorMessage: errorMsg,
+ CompletionTime: time.Now().Unix(),
+ }
+
+ // Add metadata if provided
+ if metadata != nil {
+ taskComplete.ResultMetadata = metadata
+ }
+
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskComplete{
- TaskComplete: &worker_pb.TaskComplete{
- TaskId: taskID,
- WorkerId: c.workerID,
- Success: success,
- ErrorMessage: errorMsg,
- CompletionTime: time.Now().Unix(),
- },
+ TaskComplete: taskComplete,
},
}
@@ -574,6 +726,17 @@ func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg str
// UpdateTaskProgress updates task progress to admin server
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
if !c.connected {
+ // If we're currently reconnecting, don't wait - just skip the progress update
+ c.mutex.RLock()
+ reconnecting := c.reconnecting
+ c.mutex.RUnlock()
+
+ if reconnecting {
+ // Don't treat as an error - reconnection is in progress
+ glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID)
+ return nil
+ }
+
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
@@ -663,6 +826,12 @@ func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
return fmt.Errorf("timeout waiting for connection")
}
+// GetIncomingChannel returns the incoming message channel for message processing
+// This allows the worker to process admin messages directly
+func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
+ return c.incoming
+}
+
// MockAdminClient provides a mock implementation for testing
type MockAdminClient struct {
workerID string
@@ -741,6 +910,12 @@ func (m *MockAdminClient) UpdateTaskProgress(taskID string, progress float64) er
return nil
}
+// CompleteTaskWithMetadata mock implementation
+func (m *MockAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
+ glog.Infof("Mock: Task %s completed: success=%v, error=%s, metadata=%v", taskID, success, errorMsg, metadata)
+ return nil
+}
+
// IsConnected mock implementation
func (m *MockAdminClient) IsConnected() bool {
m.mutex.RLock()
diff --git a/weed/worker/client_test.go b/weed/worker/client_test.go
deleted file mode 100644
index c57ea0240..000000000
--- a/weed/worker/client_test.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package worker
-
-import (
- "context"
- "testing"
-
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
-)
-
-func TestGrpcConnection(t *testing.T) {
- // Test that we can create a gRPC connection with insecure credentials
- // This tests the connection setup without requiring a running server
- adminAddress := "localhost:33646" // gRPC port for admin server on port 23646
-
- // This should not fail with transport security errors
- conn, err := pb.GrpcDial(context.Background(), adminAddress, false, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- // Connection failure is expected when no server is running
- // But it should NOT be a transport security error
- if err.Error() == "grpc: no transport security set" {
- t.Fatalf("Transport security error should not occur with insecure credentials: %v", err)
- }
- t.Logf("Connection failed as expected (no server running): %v", err)
- } else {
- // If connection succeeds, clean up
- conn.Close()
- t.Log("Connection succeeded")
- }
-}
-
-func TestGrpcAdminClient_Connect(t *testing.T) {
- // Test that the GrpcAdminClient can be created and attempt connection
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient("localhost:23646", "test-worker", dialOption)
-
- // This should not fail with transport security errors
- err := client.Connect()
- if err != nil {
- // Connection failure is expected when no server is running
- // But it should NOT be a transport security error
- if err.Error() == "grpc: no transport security set" {
- t.Fatalf("Transport security error should not occur with insecure credentials: %v", err)
- }
- t.Logf("Connection failed as expected (no server running): %v", err)
- } else {
- // If connection succeeds, clean up
- client.Disconnect()
- t.Log("Connection succeeded")
- }
-}
-
-func TestAdminAddressToGrpcAddress(t *testing.T) {
- tests := []struct {
- adminAddress string
- expected string
- }{
- {"localhost:9333", "localhost:19333"},
- {"localhost:23646", "localhost:33646"},
- {"admin.example.com:9333", "admin.example.com:19333"},
- {"127.0.0.1:8080", "127.0.0.1:18080"},
- }
-
- for _, test := range tests {
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient(test.adminAddress, "test-worker", dialOption)
- result := client.adminAddress
- if result != test.expected {
- t.Errorf("For admin address %s, expected gRPC address %s, got %s",
- test.adminAddress, test.expected, result)
- }
- }
-}
-
-func TestMockAdminClient(t *testing.T) {
- // Test that the mock client works correctly
- client := NewMockAdminClient()
-
- // Should be able to connect/disconnect without errors
- err := client.Connect()
- if err != nil {
- t.Fatalf("Mock client connect failed: %v", err)
- }
-
- if !client.IsConnected() {
- t.Error("Mock client should be connected")
- }
-
- err = client.Disconnect()
- if err != nil {
- t.Fatalf("Mock client disconnect failed: %v", err)
- }
-
- if client.IsConnected() {
- t.Error("Mock client should be disconnected")
- }
-}
-
-func TestCreateAdminClient(t *testing.T) {
- // Test client creation
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client, err := CreateAdminClient("localhost:9333", "test-worker", dialOption)
- if err != nil {
- t.Fatalf("Failed to create admin client: %v", err)
- }
-
- if client == nil {
- t.Fatal("Client should not be nil")
- }
-}
diff --git a/weed/worker/client_tls_test.go b/weed/worker/client_tls_test.go
deleted file mode 100644
index d95d5f4f5..000000000
--- a/weed/worker/client_tls_test.go
+++ /dev/null
@@ -1,146 +0,0 @@
-package worker
-
-import (
- "strings"
- "testing"
- "time"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
-)
-
-func TestGrpcClientTLSDetection(t *testing.T) {
- // Test that the client can be created with a dial option
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient("localhost:33646", "test-worker", dialOption)
-
- // Test that the client has the correct dial option
- if client.dialOption == nil {
- t.Error("Client should have a dial option")
- }
-
- t.Logf("Client created successfully with dial option")
-}
-
-func TestCreateAdminClientGrpc(t *testing.T) {
- // Test client creation - admin server port gets transformed to gRPC port
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client, err := CreateAdminClient("localhost:23646", "test-worker", dialOption)
- if err != nil {
- t.Fatalf("Failed to create admin client: %v", err)
- }
-
- if client == nil {
- t.Fatal("Client should not be nil")
- }
-
- // Verify it's the correct type
- grpcClient, ok := client.(*GrpcAdminClient)
- if !ok {
- t.Fatal("Client should be GrpcAdminClient type")
- }
-
- // The admin address should be transformed to the gRPC port (HTTP + 10000)
- expectedAddress := "localhost:33646" // 23646 + 10000
- if grpcClient.adminAddress != expectedAddress {
- t.Errorf("Expected admin address %s, got %s", expectedAddress, grpcClient.adminAddress)
- }
-
- if grpcClient.workerID != "test-worker" {
- t.Errorf("Expected worker ID test-worker, got %s", grpcClient.workerID)
- }
-}
-
-func TestConnectionTimeouts(t *testing.T) {
- // Test that connections have proper timeouts
- // Use localhost with a port that's definitely closed
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient("localhost:1", "test-worker", dialOption) // Port 1 is reserved and won't be open
-
- // Test that the connection creation fails when actually trying to use it
- start := time.Now()
- err := client.Connect() // This should fail when trying to establish the stream
- duration := time.Since(start)
-
- if err == nil {
- t.Error("Expected connection to closed port to fail")
- } else {
- t.Logf("Connection failed as expected: %v", err)
- }
-
- // Should fail quickly but not too quickly
- if duration > 10*time.Second {
- t.Errorf("Connection attempt took too long: %v", duration)
- }
-}
-
-func TestConnectionWithDialOption(t *testing.T) {
- // Test that the connection uses the provided dial option
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient("localhost:1", "test-worker", dialOption) // Port 1 is reserved and won't be open
-
- // Test the actual connection
- err := client.Connect()
- if err == nil {
- t.Error("Expected connection to closed port to fail")
- client.Disconnect() // Clean up if it somehow succeeded
- } else {
- t.Logf("Connection failed as expected: %v", err)
- }
-
- // The error should indicate a connection failure
- if err != nil && err.Error() != "" {
- t.Logf("Connection error message: %s", err.Error())
- // The error should contain connection-related terms
- if !strings.Contains(err.Error(), "connection") && !strings.Contains(err.Error(), "dial") {
- t.Logf("Error message doesn't indicate connection issues: %s", err.Error())
- }
- }
-}
-
-func TestClientWithSecureDialOption(t *testing.T) {
- // Test that the client correctly uses a secure dial option
- // This would normally use LoadClientTLS, but for testing we'll use insecure
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient("localhost:33646", "test-worker", dialOption)
-
- if client.dialOption == nil {
- t.Error("Client should have a dial option")
- }
-
- t.Logf("Client created successfully with dial option")
-}
-
-func TestConnectionWithRealAddress(t *testing.T) {
- // Test connection behavior with a real address that doesn't support gRPC
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient("www.google.com:80", "test-worker", dialOption) // HTTP port, not gRPC
-
- err := client.Connect()
- if err == nil {
- t.Log("Connection succeeded unexpectedly")
- client.Disconnect()
- } else {
- t.Logf("Connection failed as expected: %v", err)
- }
-}
-
-func TestDialOptionUsage(t *testing.T) {
- // Test that the provided dial option is used for connections
- dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
- client := NewGrpcAdminClient("localhost:1", "test-worker", dialOption) // Port 1 won't support gRPC at all
-
- // Verify the dial option is stored
- if client.dialOption == nil {
- t.Error("Dial option should be stored in client")
- }
-
- // Test connection fails appropriately
- err := client.Connect()
- if err == nil {
- t.Error("Connection should fail to non-gRPC port")
- client.Disconnect()
- } else {
- t.Logf("Connection failed as expected: %v", err)
- }
-}
diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go
index ea867d950..0becb3415 100644
--- a/weed/worker/tasks/balance/balance.go
+++ b/weed/worker/tasks/balance/balance.go
@@ -1,6 +1,7 @@
package balance
import (
+ "context"
"fmt"
"time"
@@ -15,6 +16,9 @@ type Task struct {
server string
volumeID uint32
collection string
+
+ // Task parameters for accessing planned destinations
+ taskParams types.TaskParams
}
// NewTask creates a new balance task instance
@@ -30,7 +34,31 @@ func NewTask(server string, volumeID uint32, collection string) *Task {
// Execute executes the balance task
func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection)
+ // Use BaseTask.ExecuteTask to handle logging initialization
+ return t.ExecuteTask(context.Background(), params, t.executeImpl)
+}
+
+// executeImpl is the actual balance implementation
+func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error {
+ // Store task parameters for accessing planned destinations
+ t.taskParams = params
+
+ // Get planned destination
+ destNode := t.getPlannedDestination()
+ if destNode != "" {
+ t.LogWithFields("INFO", "Starting balance task with planned destination", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "source": t.server,
+ "destination": destNode,
+ "collection": t.collection,
+ })
+ } else {
+ t.LogWithFields("INFO", "Starting balance task without specific destination", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ })
+ }
// Simulate balance operation with progress updates
steps := []struct {
@@ -46,18 +74,36 @@ func (t *Task) Execute(params types.TaskParams) error {
}
for _, step := range steps {
+ select {
+ case <-ctx.Done():
+ t.LogWarning("Balance task cancelled during step: %s", step.name)
+ return ctx.Err()
+ default:
+ }
+
if t.IsCancelled() {
+ t.LogWarning("Balance task cancelled by request during step: %s", step.name)
return fmt.Errorf("balance task cancelled")
}
- glog.V(1).Infof("Balance task step: %s", step.name)
+ t.LogWithFields("INFO", "Executing balance step", map[string]interface{}{
+ "step": step.name,
+ "progress": step.progress,
+ "duration": step.duration.String(),
+ "volume_id": t.volumeID,
+ })
t.SetProgress(step.progress)
// Simulate work
time.Sleep(step.duration)
}
- glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server)
+ t.LogWithFields("INFO", "Balance task completed successfully", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ "final_progress": 100.0,
+ })
return nil
}
@@ -72,6 +118,19 @@ func (t *Task) Validate(params types.TaskParams) error {
return nil
}
+// getPlannedDestination extracts the planned destination node from task parameters
+func (t *Task) getPlannedDestination() string {
+ if t.taskParams.TypedParams != nil {
+ if balanceParams := t.taskParams.TypedParams.GetBalanceParams(); balanceParams != nil {
+ if balanceParams.DestNode != "" {
+ glog.V(2).Infof("Found planned destination for volume %d: %s", t.volumeID, balanceParams.DestNode)
+ return balanceParams.DestNode
+ }
+ }
+ }
+ return ""
+}
+
// EstimateTime estimates the time needed for the task
func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
// Base time for balance operation
diff --git a/weed/worker/tasks/balance/balance_detector.go b/weed/worker/tasks/balance/balance_detector.go
deleted file mode 100644
index f082b7a77..000000000
--- a/weed/worker/tasks/balance/balance_detector.go
+++ /dev/null
@@ -1,171 +0,0 @@
-package balance
-
-import (
- "fmt"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// BalanceDetector implements TaskDetector for balance tasks
-type BalanceDetector struct {
- enabled bool
- threshold float64 // Imbalance threshold (0.1 = 10%)
- minCheckInterval time.Duration
- minVolumeCount int
- lastCheck time.Time
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*BalanceDetector)(nil)
-)
-
-// NewBalanceDetector creates a new balance detector
-func NewBalanceDetector() *BalanceDetector {
- return &BalanceDetector{
- enabled: true,
- threshold: 0.1, // 10% imbalance threshold
- minCheckInterval: 1 * time.Hour,
- minVolumeCount: 10, // Don't balance small clusters
- lastCheck: time.Time{},
- }
-}
-
-// GetTaskType returns the task type
-func (d *BalanceDetector) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// ScanForTasks checks if cluster balance is needed
-func (d *BalanceDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- glog.V(2).Infof("Scanning for balance tasks...")
-
- // Don't check too frequently
- if time.Since(d.lastCheck) < d.minCheckInterval {
- return nil, nil
- }
- d.lastCheck = time.Now()
-
- // Skip if cluster is too small
- if len(volumeMetrics) < d.minVolumeCount {
- glog.V(2).Infof("Cluster too small for balance (%d volumes < %d minimum)", len(volumeMetrics), d.minVolumeCount)
- return nil, nil
- }
-
- // Analyze volume distribution across servers
- serverVolumeCounts := make(map[string]int)
- for _, metric := range volumeMetrics {
- serverVolumeCounts[metric.Server]++
- }
-
- if len(serverVolumeCounts) < 2 {
- glog.V(2).Infof("Not enough servers for balance (%d servers)", len(serverVolumeCounts))
- return nil, nil
- }
-
- // Calculate balance metrics
- totalVolumes := len(volumeMetrics)
- avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
-
- maxVolumes := 0
- minVolumes := totalVolumes
- maxServer := ""
- minServer := ""
-
- for server, count := range serverVolumeCounts {
- if count > maxVolumes {
- maxVolumes = count
- maxServer = server
- }
- if count < minVolumes {
- minVolumes = count
- minServer = server
- }
- }
-
- // Check if imbalance exceeds threshold
- imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
- if imbalanceRatio <= d.threshold {
- glog.V(2).Infof("Cluster is balanced (imbalance ratio: %.2f <= %.2f)", imbalanceRatio, d.threshold)
- return nil, nil
- }
-
- // Create balance task
- reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
- imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
-
- task := &types.TaskDetectionResult{
- TaskType: types.TaskTypeBalance,
- Priority: types.TaskPriorityNormal,
- Reason: reason,
- ScheduleAt: time.Now(),
- Parameters: map[string]interface{}{
- "imbalance_ratio": imbalanceRatio,
- "threshold": d.threshold,
- "max_volumes": maxVolumes,
- "min_volumes": minVolumes,
- "avg_volumes_per_server": avgVolumesPerServer,
- "max_server": maxServer,
- "min_server": minServer,
- "total_servers": len(serverVolumeCounts),
- },
- }
-
- glog.V(1).Infof("🔄 Found balance task: %s", reason)
- return []*types.TaskDetectionResult{task}, nil
-}
-
-// ScanInterval returns how often to scan
-func (d *BalanceDetector) ScanInterval() time.Duration {
- return d.minCheckInterval
-}
-
-// IsEnabled returns whether the detector is enabled
-func (d *BalanceDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// SetEnabled sets whether the detector is enabled
-func (d *BalanceDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
- glog.V(1).Infof("🔄 Balance detector enabled: %v", enabled)
-}
-
-// SetThreshold sets the imbalance threshold
-func (d *BalanceDetector) SetThreshold(threshold float64) {
- d.threshold = threshold
- glog.V(1).Infof("🔄 Balance threshold set to: %.1f%%", threshold*100)
-}
-
-// SetMinCheckInterval sets the minimum time between balance checks
-func (d *BalanceDetector) SetMinCheckInterval(interval time.Duration) {
- d.minCheckInterval = interval
- glog.V(1).Infof("🔄 Balance check interval set to: %v", interval)
-}
-
-// SetMinVolumeCount sets the minimum volume count for balance operations
-func (d *BalanceDetector) SetMinVolumeCount(count int) {
- d.minVolumeCount = count
- glog.V(1).Infof("🔄 Balance minimum volume count set to: %d", count)
-}
-
-// GetThreshold returns the current imbalance threshold
-func (d *BalanceDetector) GetThreshold() float64 {
- return d.threshold
-}
-
-// GetMinCheckInterval returns the minimum check interval
-func (d *BalanceDetector) GetMinCheckInterval() time.Duration {
- return d.minCheckInterval
-}
-
-// GetMinVolumeCount returns the minimum volume count
-func (d *BalanceDetector) GetMinVolumeCount() int {
- return d.minVolumeCount
-}
diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go
index 7c2d5a520..b26a40782 100644
--- a/weed/worker/tasks/balance/balance_register.go
+++ b/weed/worker/tasks/balance/balance_register.go
@@ -2,80 +2,71 @@ package balance
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates balance task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new balance task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeBalance,
- []string{"balance", "storage", "optimization"},
- "Balance data across volume servers for optimal performance",
- ),
- }
-}
-
-// Create creates a new balance task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID, params.Collection)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterBalanceTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeBalance, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *BalanceDetector
- sharedScheduler *BalanceScheduler
-)
+// RegisterBalanceTask registers the balance task with the new architecture
+func RegisterBalanceTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- if sharedDetector == nil {
- sharedDetector = NewBalanceDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewBalanceScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeBalance,
+ Name: "balance",
+ DisplayName: "Volume Balance",
+ Description: "Balances volume distribution across servers",
+ Icon: "fas fa-balance-scale text-warning",
+ Capabilities: []string{"balance", "distribution"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 30 * time.Minute,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 2 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeBalance, factory)
+// UpdateConfigFromPersistence updates the balance configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("balance task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated balance task configuration from persistence")
+ return nil
}
diff --git a/weed/worker/tasks/balance/balance_scheduler.go b/weed/worker/tasks/balance/balance_scheduler.go
deleted file mode 100644
index a8fefe465..000000000
--- a/weed/worker/tasks/balance/balance_scheduler.go
+++ /dev/null
@@ -1,197 +0,0 @@
-package balance
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// BalanceScheduler implements TaskScheduler for balance tasks
-type BalanceScheduler struct {
- enabled bool
- maxConcurrent int
- minInterval time.Duration
- lastScheduled map[string]time.Time // track when we last scheduled a balance for each task type
- minServerCount int
- moveDuringOffHours bool
- offHoursStart string
- offHoursEnd string
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskScheduler = (*BalanceScheduler)(nil)
-)
-
-// NewBalanceScheduler creates a new balance scheduler
-func NewBalanceScheduler() *BalanceScheduler {
- return &BalanceScheduler{
- enabled: true,
- maxConcurrent: 1, // Only run one balance at a time
- minInterval: 6 * time.Hour,
- lastScheduled: make(map[string]time.Time),
- minServerCount: 3,
- moveDuringOffHours: true,
- offHoursStart: "23:00",
- offHoursEnd: "06:00",
- }
-}
-
-// GetTaskType returns the task type
-func (s *BalanceScheduler) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// CanScheduleNow determines if a balance task can be scheduled
-func (s *BalanceScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- if !s.enabled {
- return false
- }
-
- // Count running balance tasks
- runningBalanceCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeBalance {
- runningBalanceCount++
- }
- }
-
- // Check concurrency limit
- if runningBalanceCount >= s.maxConcurrent {
- glog.V(3).Infof("⏸️ Balance task blocked: too many running (%d >= %d)", runningBalanceCount, s.maxConcurrent)
- return false
- }
-
- // Check minimum interval between balance operations
- if lastTime, exists := s.lastScheduled["balance"]; exists {
- if time.Since(lastTime) < s.minInterval {
- timeLeft := s.minInterval - time.Since(lastTime)
- glog.V(3).Infof("⏸️ Balance task blocked: too soon (wait %v)", timeLeft)
- return false
- }
- }
-
- // Check if we have available workers
- availableWorkerCount := 0
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeBalance {
- availableWorkerCount++
- break
- }
- }
- }
-
- if availableWorkerCount == 0 {
- glog.V(3).Infof("⏸️ Balance task blocked: no available workers")
- return false
- }
-
- // All checks passed - can schedule
- s.lastScheduled["balance"] = time.Now()
- glog.V(2).Infof("✅ Balance task can be scheduled (running: %d/%d, workers: %d)",
- runningBalanceCount, s.maxConcurrent, availableWorkerCount)
- return true
-}
-
-// GetPriority returns the priority for balance tasks
-func (s *BalanceScheduler) GetPriority(task *types.Task) types.TaskPriority {
- // Balance is typically normal priority - not urgent but important for optimization
- return types.TaskPriorityNormal
-}
-
-// GetMaxConcurrent returns the maximum concurrent balance tasks
-func (s *BalanceScheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating balance tasks
-func (s *BalanceScheduler) GetDefaultRepeatInterval() time.Duration {
- return s.minInterval
-}
-
-// IsEnabled returns whether the scheduler is enabled
-func (s *BalanceScheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// SetEnabled sets whether the scheduler is enabled
-func (s *BalanceScheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
- glog.V(1).Infof("🔄 Balance scheduler enabled: %v", enabled)
-}
-
-// SetMaxConcurrent sets the maximum concurrent balance tasks
-func (s *BalanceScheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
- glog.V(1).Infof("🔄 Balance max concurrent set to: %d", max)
-}
-
-// SetMinInterval sets the minimum interval between balance operations
-func (s *BalanceScheduler) SetMinInterval(interval time.Duration) {
- s.minInterval = interval
- glog.V(1).Infof("🔄 Balance minimum interval set to: %v", interval)
-}
-
-// GetLastScheduled returns when we last scheduled this task type
-func (s *BalanceScheduler) GetLastScheduled(taskKey string) time.Time {
- if lastTime, exists := s.lastScheduled[taskKey]; exists {
- return lastTime
- }
- return time.Time{}
-}
-
-// SetLastScheduled updates when we last scheduled this task type
-func (s *BalanceScheduler) SetLastScheduled(taskKey string, when time.Time) {
- s.lastScheduled[taskKey] = when
-}
-
-// GetMinServerCount returns the minimum server count
-func (s *BalanceScheduler) GetMinServerCount() int {
- return s.minServerCount
-}
-
-// SetMinServerCount sets the minimum server count
-func (s *BalanceScheduler) SetMinServerCount(count int) {
- s.minServerCount = count
- glog.V(1).Infof("🔄 Balance minimum server count set to: %d", count)
-}
-
-// GetMoveDuringOffHours returns whether to move only during off-hours
-func (s *BalanceScheduler) GetMoveDuringOffHours() bool {
- return s.moveDuringOffHours
-}
-
-// SetMoveDuringOffHours sets whether to move only during off-hours
-func (s *BalanceScheduler) SetMoveDuringOffHours(enabled bool) {
- s.moveDuringOffHours = enabled
- glog.V(1).Infof("🔄 Balance move during off-hours: %v", enabled)
-}
-
-// GetOffHoursStart returns the off-hours start time
-func (s *BalanceScheduler) GetOffHoursStart() string {
- return s.offHoursStart
-}
-
-// SetOffHoursStart sets the off-hours start time
-func (s *BalanceScheduler) SetOffHoursStart(start string) {
- s.offHoursStart = start
- glog.V(1).Infof("🔄 Balance off-hours start time set to: %s", start)
-}
-
-// GetOffHoursEnd returns the off-hours end time
-func (s *BalanceScheduler) GetOffHoursEnd() string {
- return s.offHoursEnd
-}
-
-// SetOffHoursEnd sets the off-hours end time
-func (s *BalanceScheduler) SetOffHoursEnd(end string) {
- s.offHoursEnd = end
- glog.V(1).Infof("🔄 Balance off-hours end time set to: %s", end)
-}
-
-// GetMinInterval returns the minimum interval
-func (s *BalanceScheduler) GetMinInterval() time.Duration {
- return s.minInterval
-}
diff --git a/weed/worker/tasks/balance/balance_typed.go b/weed/worker/tasks/balance/balance_typed.go
new file mode 100644
index 000000000..91cd912f0
--- /dev/null
+++ b/weed/worker/tasks/balance/balance_typed.go
@@ -0,0 +1,156 @@
+package balance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TypedTask implements balance operation with typed protobuf parameters
+type TypedTask struct {
+ *base.BaseTypedTask
+
+ // Task state from protobuf
+ sourceServer string
+ destNode string
+ volumeID uint32
+ collection string
+ estimatedSize uint64
+ placementScore float64
+ forceMove bool
+ timeoutSeconds int32
+ placementConflicts []string
+}
+
+// NewTypedTask creates a new typed balance task
+func NewTypedTask() types.TypedTaskInterface {
+ task := &TypedTask{
+ BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance),
+ }
+ return task
+}
+
+// ValidateTyped validates the typed parameters for balance task
+func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
+ // Basic validation from base class
+ if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
+ return err
+ }
+
+ // Check that we have balance-specific parameters
+ balanceParams := params.GetBalanceParams()
+ if balanceParams == nil {
+ return fmt.Errorf("balance_params is required for balance task")
+ }
+
+ // Validate destination node
+ if balanceParams.DestNode == "" {
+ return fmt.Errorf("dest_node is required for balance task")
+ }
+
+ // Validate estimated size
+ if balanceParams.EstimatedSize == 0 {
+ return fmt.Errorf("estimated_size must be greater than 0")
+ }
+
+ // Validate timeout
+ if balanceParams.TimeoutSeconds <= 0 {
+ return fmt.Errorf("timeout_seconds must be greater than 0")
+ }
+
+ return nil
+}
+
+// EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters
+func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ balanceParams := params.GetBalanceParams()
+ if balanceParams != nil {
+ // Use the timeout from parameters if specified
+ if balanceParams.TimeoutSeconds > 0 {
+ return time.Duration(balanceParams.TimeoutSeconds) * time.Second
+ }
+
+ // Estimate based on volume size (1 minute per GB)
+ if balanceParams.EstimatedSize > 0 {
+ gbSize := balanceParams.EstimatedSize / (1024 * 1024 * 1024)
+ return time.Duration(gbSize) * time.Minute
+ }
+ }
+
+ // Default estimation
+ return 10 * time.Minute
+}
+
+// ExecuteTyped implements the balance operation with typed parameters
+func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
+ // Extract basic parameters
+ t.volumeID = params.VolumeId
+ t.sourceServer = params.Server
+ t.collection = params.Collection
+
+ // Extract balance-specific parameters
+ balanceParams := params.GetBalanceParams()
+ if balanceParams != nil {
+ t.destNode = balanceParams.DestNode
+ t.estimatedSize = balanceParams.EstimatedSize
+ t.placementScore = balanceParams.PlacementScore
+ t.forceMove = balanceParams.ForceMove
+ t.timeoutSeconds = balanceParams.TimeoutSeconds
+ t.placementConflicts = balanceParams.PlacementConflicts
+ }
+
+ glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)",
+ t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize)
+
+ // Log placement information
+ if t.placementScore > 0 {
+ glog.V(1).Infof("Placement score: %.2f", t.placementScore)
+ }
+ if len(t.placementConflicts) > 0 {
+ glog.V(1).Infof("Placement conflicts: %v", t.placementConflicts)
+ if !t.forceMove {
+ return fmt.Errorf("placement conflicts detected and force_move is false: %v", t.placementConflicts)
+ }
+ glog.Warningf("Proceeding with balance despite conflicts (force_move=true): %v", t.placementConflicts)
+ }
+
+ // Simulate balance operation with progress updates
+ steps := []struct {
+ name string
+ duration time.Duration
+ progress float64
+ }{
+ {"Analyzing cluster state", 2 * time.Second, 15},
+ {"Verifying destination capacity", 1 * time.Second, 25},
+ {"Starting volume migration", 1 * time.Second, 35},
+ {"Moving volume data", 6 * time.Second, 75},
+ {"Updating cluster metadata", 2 * time.Second, 95},
+ {"Verifying balance completion", 1 * time.Second, 100},
+ }
+
+ for _, step := range steps {
+ if t.IsCancelled() {
+ return fmt.Errorf("balance task cancelled during: %s", step.name)
+ }
+
+ glog.V(1).Infof("Balance task step: %s", step.name)
+ t.SetProgress(step.progress)
+
+ // Simulate work
+ time.Sleep(step.duration)
+ }
+
+ glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s",
+ t.volumeID, t.sourceServer, t.destNode)
+ return nil
+}
+
+// Register the typed task in the global registry
+func init() {
+ types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask)
+ glog.V(1).Infof("Registered typed balance task")
+}
diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go
new file mode 100644
index 000000000..9303b4b2a
--- /dev/null
+++ b/weed/worker/tasks/balance/config.go
@@ -0,0 +1,170 @@
+package balance
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with balance-specific settings
+type Config struct {
+ base.BaseConfig
+ ImbalanceThreshold float64 `json:"imbalance_threshold"`
+ MinServerCount int `json:"min_server_count"`
+}
+
+// NewDefaultConfig creates a new default balance configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 30 * 60, // 30 minutes
+ MaxConcurrent: 1,
+ },
+ ImbalanceThreshold: 0.2, // 20%
+ MinServerCount: 2,
+ }
+}
+
+// GetConfigSpec returns the configuration schema for balance tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Balance Tasks",
+ Description: "Whether balance tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic balance task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 30 * 60,
+ MinValue: 5 * 60,
+ MaxValue: 2 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volume distribution imbalances",
+ HelpText: "The system will check for volume distribution imbalances at this interval",
+ Placeholder: "30",
+ Unit: config.UnitMinutes,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 3,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of balance tasks that can run simultaneously",
+ HelpText: "Limits the number of balance operations running at the same time",
+ Placeholder: "1 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "imbalance_threshold",
+ JSONName: "imbalance_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.2,
+ MinValue: 0.05,
+ MaxValue: 0.5,
+ Required: true,
+ DisplayName: "Imbalance Threshold",
+ Description: "Minimum imbalance ratio to trigger balancing",
+ HelpText: "Volume distribution imbalances above this threshold will trigger balancing",
+ Placeholder: "0.20 (20%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_server_count",
+ JSONName: "min_server_count",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 2,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Minimum Server Count",
+ Description: "Minimum number of servers required for balancing",
+ HelpText: "Balancing will only occur if there are at least this many servers",
+ Placeholder: "2 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
+func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: float64(c.ImbalanceThreshold),
+ MinServerCount: int32(c.MinServerCount),
+ },
+ },
+ }
+}
+
+// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
+func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+
+ // Set general TaskPolicy fields
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
+
+ // Set balance-specific fields from the task config
+ if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil {
+ c.ImbalanceThreshold = float64(balanceConfig.ImbalanceThreshold)
+ c.MinServerCount = int(balanceConfig.MinServerCount)
+ }
+
+ return nil
+}
+
+// LoadConfigFromPersistence loads configuration from the persistence layer if available
+func LoadConfigFromPersistence(configPersistence interface{}) *Config {
+ config := NewDefaultConfig()
+
+ // Try to load from persistence if available
+ if persistence, ok := configPersistence.(interface {
+ LoadBalanceTaskPolicy() (*worker_pb.TaskPolicy, error)
+ }); ok {
+ if policy, err := persistence.LoadBalanceTaskPolicy(); err == nil && policy != nil {
+ if err := config.FromTaskPolicy(policy); err == nil {
+ glog.V(1).Infof("Loaded balance configuration from persistence")
+ return config
+ }
+ }
+ }
+
+ glog.V(1).Infof("Using default balance configuration")
+ return config
+}
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go
new file mode 100644
index 000000000..f4bcf3ca3
--- /dev/null
+++ b/weed/worker/tasks/balance/detection.go
@@ -0,0 +1,134 @@
+package balance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for balance tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ balanceConfig := config.(*Config)
+
+ // Skip if cluster is too small
+ minVolumeCount := 2 // More reasonable for small clusters
+ if len(metrics) < minVolumeCount {
+ glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need ≥%d)", len(metrics), minVolumeCount)
+ return nil, nil
+ }
+
+ // Analyze volume distribution across servers
+ serverVolumeCounts := make(map[string]int)
+ for _, metric := range metrics {
+ serverVolumeCounts[metric.Server]++
+ }
+
+ if len(serverVolumeCounts) < balanceConfig.MinServerCount {
+ glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need ≥%d)", len(serverVolumeCounts), balanceConfig.MinServerCount)
+ return nil, nil
+ }
+
+ // Calculate balance metrics
+ totalVolumes := len(metrics)
+ avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
+
+ maxVolumes := 0
+ minVolumes := totalVolumes
+ maxServer := ""
+ minServer := ""
+
+ for server, count := range serverVolumeCounts {
+ if count > maxVolumes {
+ maxVolumes = count
+ maxServer = server
+ }
+ if count < minVolumes {
+ minVolumes = count
+ minServer = server
+ }
+ }
+
+ // Check if imbalance exceeds threshold
+ imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
+ if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
+ glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
+ imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+ return nil, nil
+ }
+
+ // Select a volume from the overloaded server for balance
+ var selectedVolume *types.VolumeHealthMetrics
+ for _, metric := range metrics {
+ if metric.Server == maxServer {
+ selectedVolume = metric
+ break
+ }
+ }
+
+ if selectedVolume == nil {
+ glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer)
+ return nil, nil
+ }
+
+ // Create balance task with volume and destination planning info
+ reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
+ imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+
+ task := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeBalance,
+ VolumeID: selectedVolume.VolumeID,
+ Server: selectedVolume.Server,
+ Collection: selectedVolume.Collection,
+ Priority: types.TaskPriorityNormal,
+ Reason: reason,
+ ScheduleAt: time.Now(),
+ // TypedParams will be populated by the maintenance integration
+ // with destination planning information
+ }
+
+ return []*types.TaskDetectionResult{task}, nil
+}
+
+// Scheduling implements the scheduling logic for balance tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ balanceConfig := config.(*Config)
+
+ // Count running balance tasks
+ runningBalanceCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeBalance {
+ runningBalanceCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningBalanceCount >= balanceConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ availableWorkerCount := 0
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeBalance {
+ availableWorkerCount++
+ break
+ }
+ }
+ }
+
+ return availableWorkerCount > 0
+}
+
+// CreateTask creates a new balance task instance
+func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Create and return the balance task using existing Task type
+ return NewTask(params.Server, params.VolumeID, params.Collection), nil
+}
diff --git a/weed/worker/tasks/balance/ui.go b/weed/worker/tasks/balance/ui.go
deleted file mode 100644
index 2cea20a76..000000000
--- a/weed/worker/tasks/balance/ui.go
+++ /dev/null
@@ -1,361 +0,0 @@
-package balance
-
-import (
- "fmt"
- "html/template"
- "strconv"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// UIProvider provides the UI for balance task configuration
-type UIProvider struct {
- detector *BalanceDetector
- scheduler *BalanceScheduler
-}
-
-// NewUIProvider creates a new balance UI provider
-func NewUIProvider(detector *BalanceDetector, scheduler *BalanceScheduler) *UIProvider {
- return &UIProvider{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetTaskType returns the task type
-func (ui *UIProvider) GetTaskType() types.TaskType {
- return types.TaskTypeBalance
-}
-
-// GetDisplayName returns the human-readable name
-func (ui *UIProvider) GetDisplayName() string {
- return "Volume Balance"
-}
-
-// GetDescription returns a description of what this task does
-func (ui *UIProvider) GetDescription() string {
- return "Redistributes volumes across volume servers to optimize storage utilization and performance"
-}
-
-// GetIcon returns the icon CSS class for this task type
-func (ui *UIProvider) GetIcon() string {
- return "fas fa-balance-scale text-secondary"
-}
-
-// BalanceConfig represents the balance configuration
-type BalanceConfig struct {
- Enabled bool `json:"enabled"`
- ImbalanceThreshold float64 `json:"imbalance_threshold"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- MinServerCount int `json:"min_server_count"`
- MoveDuringOffHours bool `json:"move_during_off_hours"`
- OffHoursStart string `json:"off_hours_start"`
- OffHoursEnd string `json:"off_hours_end"`
- MinIntervalSeconds int `json:"min_interval_seconds"`
-}
-
-// Helper functions for duration conversion
-func secondsToDuration(seconds int) time.Duration {
- return time.Duration(seconds) * time.Second
-}
-
-func durationToSeconds(d time.Duration) int {
- return int(d.Seconds())
-}
-
-// formatDurationForUser formats seconds as a user-friendly duration string
-func formatDurationForUser(seconds int) string {
- d := secondsToDuration(seconds)
- if d < time.Minute {
- return fmt.Sprintf("%ds", seconds)
- }
- if d < time.Hour {
- return fmt.Sprintf("%.0fm", d.Minutes())
- }
- if d < 24*time.Hour {
- return fmt.Sprintf("%.1fh", d.Hours())
- }
- return fmt.Sprintf("%.1fd", d.Hours()/24)
-}
-
-// RenderConfigForm renders the configuration form HTML
-func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
- config := ui.getCurrentBalanceConfig()
-
- // Build form using the FormBuilder helper
- form := types.NewFormBuilder()
-
- // Detection Settings
- form.AddCheckboxField(
- "enabled",
- "Enable Balance Tasks",
- "Whether balance tasks should be automatically created",
- config.Enabled,
- )
-
- form.AddNumberField(
- "imbalance_threshold",
- "Imbalance Threshold (%)",
- "Trigger balance when storage imbalance exceeds this percentage (0.0-1.0)",
- config.ImbalanceThreshold,
- true,
- )
-
- form.AddDurationField("scan_interval", "Scan Interval", "How often to scan for imbalanced volumes", secondsToDuration(config.ScanIntervalSeconds), true)
-
- // Scheduling Settings
- form.AddNumberField(
- "max_concurrent",
- "Max Concurrent Tasks",
- "Maximum number of balance tasks that can run simultaneously",
- float64(config.MaxConcurrent),
- true,
- )
-
- form.AddNumberField(
- "min_server_count",
- "Minimum Server Count",
- "Only balance when at least this many servers are available",
- float64(config.MinServerCount),
- true,
- )
-
- // Timing Settings
- form.AddCheckboxField(
- "move_during_off_hours",
- "Restrict to Off-Hours",
- "Only perform balance operations during off-peak hours",
- config.MoveDuringOffHours,
- )
-
- form.AddTextField(
- "off_hours_start",
- "Off-Hours Start Time",
- "Start time for off-hours window (e.g., 23:00)",
- config.OffHoursStart,
- false,
- )
-
- form.AddTextField(
- "off_hours_end",
- "Off-Hours End Time",
- "End time for off-hours window (e.g., 06:00)",
- config.OffHoursEnd,
- false,
- )
-
- // Timing constraints
- form.AddDurationField("min_interval", "Min Interval", "Minimum time between balance operations", secondsToDuration(config.MinIntervalSeconds), true)
-
- // Generate organized form sections using Bootstrap components
- html := `
-<div class="row">
- <div class="col-12">
- <div class="card mb-4">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-balance-scale me-2"></i>
- Balance Configuration
- </h5>
- </div>
- <div class="card-body">
-` + string(form.Build()) + `
- </div>
- </div>
- </div>
-</div>
-
-<div class="row">
- <div class="col-12">
- <div class="card mb-3">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-exclamation-triangle me-2"></i>
- Performance Considerations
- </h5>
- </div>
- <div class="card-body">
- <div class="alert alert-warning" role="alert">
- <h6 class="alert-heading">Important Considerations:</h6>
- <p class="mb-2"><strong>Performance:</strong> Volume balancing involves data movement and can impact cluster performance.</p>
- <p class="mb-2"><strong>Recommendation:</strong> Enable off-hours restriction to minimize impact on production workloads.</p>
- <p class="mb-0"><strong>Safety:</strong> Requires at least ` + fmt.Sprintf("%d", config.MinServerCount) + ` servers to ensure data safety during moves.</p>
- </div>
- </div>
- </div>
- </div>
-</div>`
-
- return template.HTML(html), nil
-}
-
-// ParseConfigForm parses form data into configuration
-func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
- config := &BalanceConfig{}
-
- // Parse enabled
- config.Enabled = len(formData["enabled"]) > 0
-
- // Parse imbalance threshold
- if values, ok := formData["imbalance_threshold"]; ok && len(values) > 0 {
- threshold, err := strconv.ParseFloat(values[0], 64)
- if err != nil {
- return nil, fmt.Errorf("invalid imbalance threshold: %w", err)
- }
- if threshold < 0 || threshold > 1 {
- return nil, fmt.Errorf("imbalance threshold must be between 0.0 and 1.0")
- }
- config.ImbalanceThreshold = threshold
- }
-
- // Parse scan interval
- if values, ok := formData["scan_interval"]; ok && len(values) > 0 {
- duration, err := time.ParseDuration(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid scan interval: %w", err)
- }
- config.ScanIntervalSeconds = int(duration.Seconds())
- }
-
- // Parse max concurrent
- if values, ok := formData["max_concurrent"]; ok && len(values) > 0 {
- maxConcurrent, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid max concurrent: %w", err)
- }
- if maxConcurrent < 1 {
- return nil, fmt.Errorf("max concurrent must be at least 1")
- }
- config.MaxConcurrent = maxConcurrent
- }
-
- // Parse min server count
- if values, ok := formData["min_server_count"]; ok && len(values) > 0 {
- minServerCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid min server count: %w", err)
- }
- if minServerCount < 2 {
- return nil, fmt.Errorf("min server count must be at least 2")
- }
- config.MinServerCount = minServerCount
- }
-
- // Parse off-hours settings
- config.MoveDuringOffHours = len(formData["move_during_off_hours"]) > 0
-
- if values, ok := formData["off_hours_start"]; ok && len(values) > 0 {
- config.OffHoursStart = values[0]
- }
-
- if values, ok := formData["off_hours_end"]; ok && len(values) > 0 {
- config.OffHoursEnd = values[0]
- }
-
- // Parse min interval
- if values, ok := formData["min_interval"]; ok && len(values) > 0 {
- duration, err := time.ParseDuration(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid min interval: %w", err)
- }
- config.MinIntervalSeconds = int(duration.Seconds())
- }
-
- return config, nil
-}
-
-// GetCurrentConfig returns the current configuration
-func (ui *UIProvider) GetCurrentConfig() interface{} {
- return ui.getCurrentBalanceConfig()
-}
-
-// ApplyConfig applies the new configuration
-func (ui *UIProvider) ApplyConfig(config interface{}) error {
- balanceConfig, ok := config.(*BalanceConfig)
- if !ok {
- return fmt.Errorf("invalid config type, expected *BalanceConfig")
- }
-
- // Apply to detector
- if ui.detector != nil {
- ui.detector.SetEnabled(balanceConfig.Enabled)
- ui.detector.SetThreshold(balanceConfig.ImbalanceThreshold)
- ui.detector.SetMinCheckInterval(secondsToDuration(balanceConfig.ScanIntervalSeconds))
- }
-
- // Apply to scheduler
- if ui.scheduler != nil {
- ui.scheduler.SetEnabled(balanceConfig.Enabled)
- ui.scheduler.SetMaxConcurrent(balanceConfig.MaxConcurrent)
- ui.scheduler.SetMinServerCount(balanceConfig.MinServerCount)
- ui.scheduler.SetMoveDuringOffHours(balanceConfig.MoveDuringOffHours)
- ui.scheduler.SetOffHoursStart(balanceConfig.OffHoursStart)
- ui.scheduler.SetOffHoursEnd(balanceConfig.OffHoursEnd)
- }
-
- glog.V(1).Infof("Applied balance configuration: enabled=%v, threshold=%.1f%%, max_concurrent=%d, min_servers=%d, off_hours=%v",
- balanceConfig.Enabled, balanceConfig.ImbalanceThreshold*100, balanceConfig.MaxConcurrent,
- balanceConfig.MinServerCount, balanceConfig.MoveDuringOffHours)
-
- return nil
-}
-
-// getCurrentBalanceConfig gets the current configuration from detector and scheduler
-func (ui *UIProvider) getCurrentBalanceConfig() *BalanceConfig {
- config := &BalanceConfig{
- // Default values (fallback if detectors/schedulers are nil)
- Enabled: true,
- ImbalanceThreshold: 0.1, // 10% imbalance
- ScanIntervalSeconds: durationToSeconds(4 * time.Hour),
- MaxConcurrent: 1,
- MinServerCount: 3,
- MoveDuringOffHours: true,
- OffHoursStart: "23:00",
- OffHoursEnd: "06:00",
- MinIntervalSeconds: durationToSeconds(1 * time.Hour),
- }
-
- // Get current values from detector
- if ui.detector != nil {
- config.Enabled = ui.detector.IsEnabled()
- config.ImbalanceThreshold = ui.detector.GetThreshold()
- config.ScanIntervalSeconds = int(ui.detector.ScanInterval().Seconds())
- }
-
- // Get current values from scheduler
- if ui.scheduler != nil {
- config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
- config.MinServerCount = ui.scheduler.GetMinServerCount()
- config.MoveDuringOffHours = ui.scheduler.GetMoveDuringOffHours()
- config.OffHoursStart = ui.scheduler.GetOffHoursStart()
- config.OffHoursEnd = ui.scheduler.GetOffHoursEnd()
- }
-
- return config
-}
-
-// RegisterUI registers the balance UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *BalanceDetector, scheduler *BalanceScheduler) {
- uiProvider := NewUIProvider(detector, scheduler)
- uiRegistry.RegisterUI(uiProvider)
-
- glog.V(1).Infof("✅ Registered balance task UI provider")
-}
-
-// DefaultBalanceConfig returns default balance configuration
-func DefaultBalanceConfig() *BalanceConfig {
- return &BalanceConfig{
- Enabled: false,
- ImbalanceThreshold: 0.3,
- ScanIntervalSeconds: durationToSeconds(4 * time.Hour),
- MaxConcurrent: 1,
- MinServerCount: 3,
- MoveDuringOffHours: false,
- OffHoursStart: "22:00",
- OffHoursEnd: "06:00",
- MinIntervalSeconds: durationToSeconds(1 * time.Hour),
- }
-}
diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go
new file mode 100644
index 000000000..27ad1bb29
--- /dev/null
+++ b/weed/worker/tasks/base/generic_components.go
@@ -0,0 +1,129 @@
+package base
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericDetector implements TaskDetector using function-based logic
+type GenericDetector struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericDetector creates a detector from a task definition
+func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
+ return &GenericDetector{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (d *GenericDetector) GetTaskType() types.TaskType {
+ return d.taskDef.Type
+}
+
+// ScanForTasks scans using the task definition's detection function
+func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
+ if d.taskDef.DetectionFunc == nil {
+ return nil, nil
+ }
+ return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
+}
+
+// ScanInterval returns the scan interval from task definition
+func (d *GenericDetector) ScanInterval() time.Duration {
+ if d.taskDef.ScanInterval > 0 {
+ return d.taskDef.ScanInterval
+ }
+ return 30 * time.Minute // Default
+}
+
+// IsEnabled returns whether this detector is enabled
+func (d *GenericDetector) IsEnabled() bool {
+ return d.taskDef.Config.IsEnabled()
+}
+
+// GenericScheduler implements TaskScheduler using function-based logic
+type GenericScheduler struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericScheduler creates a scheduler from a task definition
+func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
+ return &GenericScheduler{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (s *GenericScheduler) GetTaskType() types.TaskType {
+ return s.taskDef.Type
+}
+
+// CanScheduleNow determines if a task can be scheduled using the task definition's function
+func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if s.taskDef.SchedulingFunc == nil {
+ return s.defaultCanSchedule(task, runningTasks, availableWorkers)
+ }
+ return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
+}
+
+// defaultCanSchedule provides default scheduling logic
+func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if !s.taskDef.Config.IsEnabled() {
+ return false
+ }
+
+ // Count running tasks of this type
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == s.taskDef.Type {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ maxConcurrent := s.taskDef.MaxConcurrent
+ if maxConcurrent <= 0 {
+ maxConcurrent = 1 // Default
+ }
+ if runningCount >= maxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == s.taskDef.Type {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// GetPriority returns the priority for this task
+func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority {
+ return task.Priority
+}
+
+// GetMaxConcurrent returns max concurrent tasks
+func (s *GenericScheduler) GetMaxConcurrent() int {
+ if s.taskDef.MaxConcurrent > 0 {
+ return s.taskDef.MaxConcurrent
+ }
+ return 1 // Default
+}
+
+// GetDefaultRepeatInterval returns the default repeat interval
+func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
+ if s.taskDef.RepeatInterval > 0 {
+ return s.taskDef.RepeatInterval
+ }
+ return 24 * time.Hour // Default
+}
+
+// IsEnabled returns whether this scheduler is enabled
+func (s *GenericScheduler) IsEnabled() bool {
+ return s.taskDef.Config.IsEnabled()
+}
diff --git a/weed/worker/tasks/base/registration.go b/weed/worker/tasks/base/registration.go
new file mode 100644
index 000000000..416b6f6b8
--- /dev/null
+++ b/weed/worker/tasks/base/registration.go
@@ -0,0 +1,155 @@
+package base
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericFactory creates task instances using a TaskDefinition
+type GenericFactory struct {
+ *tasks.BaseTaskFactory
+ taskDef *TaskDefinition
+}
+
+// NewGenericFactory creates a generic task factory
+func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory {
+ return &GenericFactory{
+ BaseTaskFactory: tasks.NewBaseTaskFactory(
+ taskDef.Type,
+ taskDef.Capabilities,
+ taskDef.Description,
+ ),
+ taskDef: taskDef,
+ }
+}
+
+// Create creates a task instance using the task definition
+func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) {
+ if f.taskDef.CreateTask == nil {
+ return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type)
+ }
+ return f.taskDef.CreateTask(params)
+}
+
+// GenericSchemaProvider provides config schema from TaskDefinition
+type GenericSchemaProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetConfigSchema returns the schema from task definition
+func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
+ return &tasks.TaskConfigSchema{
+ TaskName: string(p.taskDef.Type),
+ DisplayName: p.taskDef.DisplayName,
+ Description: p.taskDef.Description,
+ Icon: p.taskDef.Icon,
+ Schema: config.Schema{
+ Fields: p.taskDef.ConfigSpec.Fields,
+ },
+ }
+}
+
+// GenericUIProvider provides UI functionality from TaskDefinition
+type GenericUIProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetTaskType returns the task type
+func (ui *GenericUIProvider) GetTaskType() types.TaskType {
+ return ui.taskDef.Type
+}
+
+// GetDisplayName returns the human-readable name
+func (ui *GenericUIProvider) GetDisplayName() string {
+ return ui.taskDef.DisplayName
+}
+
+// GetDescription returns a description of what this task does
+func (ui *GenericUIProvider) GetDescription() string {
+ return ui.taskDef.Description
+}
+
+// GetIcon returns the icon CSS class for this task type
+func (ui *GenericUIProvider) GetIcon() string {
+ return ui.taskDef.Icon
+}
+
+// GetCurrentConfig returns current config as TaskConfig
+func (ui *GenericUIProvider) GetCurrentConfig() types.TaskConfig {
+ return ui.taskDef.Config
+}
+
+// ApplyTaskPolicy applies protobuf TaskPolicy configuration
+func (ui *GenericUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return ui.taskDef.Config.FromTaskPolicy(policy)
+}
+
+// ApplyTaskConfig applies TaskConfig interface configuration
+func (ui *GenericUIProvider) ApplyTaskConfig(config types.TaskConfig) error {
+ taskPolicy := config.ToTaskPolicy()
+ return ui.taskDef.Config.FromTaskPolicy(taskPolicy)
+}
+
+// RegisterTask registers a complete task definition with all registries
+func RegisterTask(taskDef *TaskDefinition) {
+ // Validate task definition
+ if err := validateTaskDefinition(taskDef); err != nil {
+ glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err)
+ return
+ }
+
+ // Create and register factory
+ factory := NewGenericFactory(taskDef)
+ tasks.AutoRegister(taskDef.Type, factory)
+
+ // Create and register detector/scheduler
+ detector := NewGenericDetector(taskDef)
+ scheduler := NewGenericScheduler(taskDef)
+
+ tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
+ registry.RegisterTask(detector, scheduler)
+ })
+
+ // Create and register schema provider
+ schemaProvider := &GenericSchemaProvider{taskDef: taskDef}
+ tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider)
+
+ // Create and register UI provider
+ uiProvider := &GenericUIProvider{taskDef: taskDef}
+ tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
+ baseUIProvider := tasks.NewBaseUIProvider(
+ taskDef.Type,
+ taskDef.DisplayName,
+ taskDef.Description,
+ taskDef.Icon,
+ schemaProvider.GetConfigSchema,
+ uiProvider.GetCurrentConfig,
+ uiProvider.ApplyTaskPolicy,
+ uiProvider.ApplyTaskConfig,
+ )
+ uiRegistry.RegisterUI(baseUIProvider)
+ })
+
+ glog.V(1).Infof("✅ Registered complete task definition: %s", taskDef.Type)
+}
+
+// validateTaskDefinition ensures the task definition is complete
+func validateTaskDefinition(taskDef *TaskDefinition) error {
+ if taskDef.Type == "" {
+ return fmt.Errorf("task type is required")
+ }
+ if taskDef.Name == "" {
+ return fmt.Errorf("task name is required")
+ }
+ if taskDef.Config == nil {
+ return fmt.Errorf("task config is required")
+ }
+ // CreateTask is optional for tasks that use the typed task system
+ // The typed system registers tasks separately via types.RegisterGlobalTypedTask()
+ return nil
+}
diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go
new file mode 100644
index 000000000..6689d9c81
--- /dev/null
+++ b/weed/worker/tasks/base/task_definition.go
@@ -0,0 +1,272 @@
+package base
+
+import (
+ "fmt"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TaskDefinition encapsulates everything needed to define a complete task type
+type TaskDefinition struct {
+ // Basic task information
+ Type types.TaskType
+ Name string
+ DisplayName string
+ Description string
+ Icon string
+ Capabilities []string
+
+ // Task configuration
+ Config TaskConfig
+ ConfigSpec ConfigSpec
+
+ // Task creation
+ CreateTask func(params types.TaskParams) (types.TaskInterface, error)
+
+ // Detection logic
+ DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error)
+ ScanInterval time.Duration
+
+ // Scheduling logic
+ SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool
+ MaxConcurrent int
+ RepeatInterval time.Duration
+}
+
+// TaskConfig provides a configuration interface that supports type-safe defaults
+type TaskConfig interface {
+ config.ConfigWithDefaults // Extends ConfigWithDefaults for type-safe schema operations
+ IsEnabled() bool
+ SetEnabled(bool)
+ ToTaskPolicy() *worker_pb.TaskPolicy
+ FromTaskPolicy(policy *worker_pb.TaskPolicy) error
+}
+
+// ConfigSpec defines the configuration schema
+type ConfigSpec struct {
+ Fields []*config.Field
+}
+
+// BaseConfig provides common configuration fields with reflection-based serialization
+type BaseConfig struct {
+ Enabled bool `json:"enabled"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+}
+
+// IsEnabled returns whether the task is enabled
+func (c *BaseConfig) IsEnabled() bool {
+ return c.Enabled
+}
+
+// SetEnabled sets whether the task is enabled
+func (c *BaseConfig) SetEnabled(enabled bool) {
+ c.Enabled = enabled
+}
+
+// Validate validates the base configuration
+func (c *BaseConfig) Validate() error {
+ // Common validation logic
+ return nil
+}
+
+// StructToMap converts any struct to a map using reflection
+func StructToMap(obj interface{}) map[string]interface{} {
+ result := make(map[string]interface{})
+ val := reflect.ValueOf(obj)
+
+ // Handle pointer to struct
+ if val.Kind() == reflect.Ptr {
+ val = val.Elem()
+ }
+
+ if val.Kind() != reflect.Struct {
+ return result
+ }
+
+ typ := val.Type()
+
+ for i := 0; i < val.NumField(); i++ {
+ field := val.Field(i)
+ fieldType := typ.Field(i)
+
+ // Skip unexported fields
+ if !field.CanInterface() {
+ continue
+ }
+
+ // Handle embedded structs recursively (before JSON tag check)
+ if field.Kind() == reflect.Struct && fieldType.Anonymous {
+ embeddedMap := StructToMap(field.Interface())
+ for k, v := range embeddedMap {
+ result[k] = v
+ }
+ continue
+ }
+
+ // Get JSON tag name
+ jsonTag := fieldType.Tag.Get("json")
+ if jsonTag == "" || jsonTag == "-" {
+ continue
+ }
+
+ // Remove options like ",omitempty"
+ if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
+ jsonTag = jsonTag[:commaIdx]
+ }
+
+ result[jsonTag] = field.Interface()
+ }
+ return result
+}
+
+// MapToStruct loads data from map into struct using reflection
+func MapToStruct(data map[string]interface{}, obj interface{}) error {
+ val := reflect.ValueOf(obj)
+
+ // Must be pointer to struct
+ if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct {
+ return fmt.Errorf("obj must be pointer to struct")
+ }
+
+ val = val.Elem()
+ typ := val.Type()
+
+ for i := 0; i < val.NumField(); i++ {
+ field := val.Field(i)
+ fieldType := typ.Field(i)
+
+ // Skip unexported fields
+ if !field.CanSet() {
+ continue
+ }
+
+ // Handle embedded structs recursively (before JSON tag check)
+ if field.Kind() == reflect.Struct && fieldType.Anonymous {
+ err := MapToStruct(data, field.Addr().Interface())
+ if err != nil {
+ return err
+ }
+ continue
+ }
+
+ // Get JSON tag name
+ jsonTag := fieldType.Tag.Get("json")
+ if jsonTag == "" || jsonTag == "-" {
+ continue
+ }
+
+ // Remove options like ",omitempty"
+ if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
+ jsonTag = jsonTag[:commaIdx]
+ }
+
+ if value, exists := data[jsonTag]; exists {
+ err := setFieldValue(field, value)
+ if err != nil {
+ return fmt.Errorf("failed to set field %s: %v", jsonTag, err)
+ }
+ }
+ }
+
+ return nil
+}
+
+// ToMap converts config to map using reflection
+// ToTaskPolicy converts BaseConfig to protobuf (partial implementation)
+// Note: Concrete implementations should override this to include task-specific config
+func (c *BaseConfig) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ // TaskConfig field should be set by concrete implementations
+ }
+}
+
+// FromTaskPolicy loads BaseConfig from protobuf (partial implementation)
+// Note: Concrete implementations should override this to handle task-specific config
+func (c *BaseConfig) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
+ return nil
+}
+
+// ApplySchemaDefaults applies default values from schema using reflection
+func (c *BaseConfig) ApplySchemaDefaults(schema *config.Schema) error {
+ // Use reflection-based approach for BaseConfig since it needs to handle embedded structs
+ return schema.ApplyDefaultsToProtobuf(c)
+}
+
+// setFieldValue sets a field value with type conversion
+func setFieldValue(field reflect.Value, value interface{}) error {
+ if value == nil {
+ return nil
+ }
+
+ valueVal := reflect.ValueOf(value)
+ fieldType := field.Type()
+ valueType := valueVal.Type()
+
+ // Direct assignment if types match
+ if valueType.AssignableTo(fieldType) {
+ field.Set(valueVal)
+ return nil
+ }
+
+ // Type conversion for common cases
+ switch fieldType.Kind() {
+ case reflect.Bool:
+ if b, ok := value.(bool); ok {
+ field.SetBool(b)
+ } else {
+ return fmt.Errorf("cannot convert %T to bool", value)
+ }
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+ switch v := value.(type) {
+ case int:
+ field.SetInt(int64(v))
+ case int32:
+ field.SetInt(int64(v))
+ case int64:
+ field.SetInt(v)
+ case float64:
+ field.SetInt(int64(v))
+ default:
+ return fmt.Errorf("cannot convert %T to int", value)
+ }
+ case reflect.Float32, reflect.Float64:
+ switch v := value.(type) {
+ case float32:
+ field.SetFloat(float64(v))
+ case float64:
+ field.SetFloat(v)
+ case int:
+ field.SetFloat(float64(v))
+ case int64:
+ field.SetFloat(float64(v))
+ default:
+ return fmt.Errorf("cannot convert %T to float", value)
+ }
+ case reflect.String:
+ if s, ok := value.(string); ok {
+ field.SetString(s)
+ } else {
+ return fmt.Errorf("cannot convert %T to string", value)
+ }
+ default:
+ return fmt.Errorf("unsupported field type %s", fieldType.Kind())
+ }
+
+ return nil
+}
diff --git a/weed/worker/tasks/base/task_definition_test.go b/weed/worker/tasks/base/task_definition_test.go
new file mode 100644
index 000000000..a0a0a5a24
--- /dev/null
+++ b/weed/worker/tasks/base/task_definition_test.go
@@ -0,0 +1,338 @@
+package base
+
+import (
+ "reflect"
+ "testing"
+)
+
+// Test structs that mirror the actual configuration structure
+type TestBaseConfig struct {
+ Enabled bool `json:"enabled"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+}
+
+type TestTaskConfig struct {
+ TestBaseConfig
+ TaskSpecificField float64 `json:"task_specific_field"`
+ AnotherSpecificField string `json:"another_specific_field"`
+}
+
+type TestNestedConfig struct {
+ TestBaseConfig
+ NestedStruct struct {
+ NestedField string `json:"nested_field"`
+ } `json:"nested_struct"`
+ TaskField int `json:"task_field"`
+}
+
+func TestStructToMap_WithEmbeddedStruct(t *testing.T) {
+ // Test case 1: Basic embedded struct
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "test_value",
+ }
+
+ result := StructToMap(config)
+
+ // Verify all fields are present
+ expectedFields := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 1800,
+ "max_concurrent": 3,
+ "task_specific_field": 0.25,
+ "another_specific_field": "test_value",
+ }
+
+ if len(result) != len(expectedFields) {
+ t.Errorf("Expected %d fields, got %d. Result: %+v", len(expectedFields), len(result), result)
+ }
+
+ for key, expectedValue := range expectedFields {
+ if actualValue, exists := result[key]; !exists {
+ t.Errorf("Missing field: %s", key)
+ } else if !reflect.DeepEqual(actualValue, expectedValue) {
+ t.Errorf("Field %s: expected %v (%T), got %v (%T)", key, expectedValue, expectedValue, actualValue, actualValue)
+ }
+ }
+}
+
+func TestStructToMap_WithNestedStruct(t *testing.T) {
+ config := &TestNestedConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: false,
+ ScanIntervalSeconds: 3600,
+ MaxConcurrent: 1,
+ },
+ NestedStruct: struct {
+ NestedField string `json:"nested_field"`
+ }{
+ NestedField: "nested_value",
+ },
+ TaskField: 42,
+ }
+
+ result := StructToMap(config)
+
+ // Verify embedded struct fields are included
+ if enabled, exists := result["enabled"]; !exists || enabled != false {
+ t.Errorf("Expected enabled=false from embedded struct, got %v", enabled)
+ }
+
+ if scanInterval, exists := result["scan_interval_seconds"]; !exists || scanInterval != 3600 {
+ t.Errorf("Expected scan_interval_seconds=3600 from embedded struct, got %v", scanInterval)
+ }
+
+ if maxConcurrent, exists := result["max_concurrent"]; !exists || maxConcurrent != 1 {
+ t.Errorf("Expected max_concurrent=1 from embedded struct, got %v", maxConcurrent)
+ }
+
+ // Verify regular fields are included
+ if taskField, exists := result["task_field"]; !exists || taskField != 42 {
+ t.Errorf("Expected task_field=42, got %v", taskField)
+ }
+
+ // Verify nested struct is included as a whole
+ if nestedStruct, exists := result["nested_struct"]; !exists {
+ t.Errorf("Missing nested_struct field")
+ } else {
+ // The nested struct should be included as-is, not flattened
+ if nested, ok := nestedStruct.(struct {
+ NestedField string `json:"nested_field"`
+ }); !ok || nested.NestedField != "nested_value" {
+ t.Errorf("Expected nested_struct with NestedField='nested_value', got %v", nestedStruct)
+ }
+ }
+}
+
+func TestMapToStruct_WithEmbeddedStruct(t *testing.T) {
+ // Test data with all fields including embedded struct fields
+ data := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 2400,
+ "max_concurrent": 5,
+ "task_specific_field": 0.15,
+ "another_specific_field": "updated_value",
+ }
+
+ config := &TestTaskConfig{}
+ err := MapToStruct(data, config)
+
+ if err != nil {
+ t.Fatalf("MapToStruct failed: %v", err)
+ }
+
+ // Verify embedded struct fields were set
+ if config.Enabled != true {
+ t.Errorf("Expected Enabled=true, got %v", config.Enabled)
+ }
+
+ if config.ScanIntervalSeconds != 2400 {
+ t.Errorf("Expected ScanIntervalSeconds=2400, got %v", config.ScanIntervalSeconds)
+ }
+
+ if config.MaxConcurrent != 5 {
+ t.Errorf("Expected MaxConcurrent=5, got %v", config.MaxConcurrent)
+ }
+
+ // Verify regular fields were set
+ if config.TaskSpecificField != 0.15 {
+ t.Errorf("Expected TaskSpecificField=0.15, got %v", config.TaskSpecificField)
+ }
+
+ if config.AnotherSpecificField != "updated_value" {
+ t.Errorf("Expected AnotherSpecificField='updated_value', got %v", config.AnotherSpecificField)
+ }
+}
+
+func TestMapToStruct_PartialData(t *testing.T) {
+ // Test with only some fields present (simulating form data)
+ data := map[string]interface{}{
+ "enabled": false,
+ "max_concurrent": 2,
+ "task_specific_field": 0.30,
+ }
+
+ // Start with some initial values
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 1,
+ },
+ TaskSpecificField: 0.20,
+ AnotherSpecificField: "initial_value",
+ }
+
+ err := MapToStruct(data, config)
+
+ if err != nil {
+ t.Fatalf("MapToStruct failed: %v", err)
+ }
+
+ // Verify updated fields
+ if config.Enabled != false {
+ t.Errorf("Expected Enabled=false (updated), got %v", config.Enabled)
+ }
+
+ if config.MaxConcurrent != 2 {
+ t.Errorf("Expected MaxConcurrent=2 (updated), got %v", config.MaxConcurrent)
+ }
+
+ if config.TaskSpecificField != 0.30 {
+ t.Errorf("Expected TaskSpecificField=0.30 (updated), got %v", config.TaskSpecificField)
+ }
+
+ // Verify unchanged fields remain the same
+ if config.ScanIntervalSeconds != 1800 {
+ t.Errorf("Expected ScanIntervalSeconds=1800 (unchanged), got %v", config.ScanIntervalSeconds)
+ }
+
+ if config.AnotherSpecificField != "initial_value" {
+ t.Errorf("Expected AnotherSpecificField='initial_value' (unchanged), got %v", config.AnotherSpecificField)
+ }
+}
+
+func TestRoundTripSerialization(t *testing.T) {
+ // Test complete round-trip: struct -> map -> struct
+ original := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 3600,
+ MaxConcurrent: 4,
+ },
+ TaskSpecificField: 0.18,
+ AnotherSpecificField: "round_trip_test",
+ }
+
+ // Convert to map
+ dataMap := StructToMap(original)
+
+ // Convert back to struct
+ roundTrip := &TestTaskConfig{}
+ err := MapToStruct(dataMap, roundTrip)
+
+ if err != nil {
+ t.Fatalf("Round-trip MapToStruct failed: %v", err)
+ }
+
+ // Verify all fields match
+ if !reflect.DeepEqual(original.TestBaseConfig, roundTrip.TestBaseConfig) {
+ t.Errorf("BaseConfig mismatch:\nOriginal: %+v\nRound-trip: %+v", original.TestBaseConfig, roundTrip.TestBaseConfig)
+ }
+
+ if original.TaskSpecificField != roundTrip.TaskSpecificField {
+ t.Errorf("TaskSpecificField mismatch: %v != %v", original.TaskSpecificField, roundTrip.TaskSpecificField)
+ }
+
+ if original.AnotherSpecificField != roundTrip.AnotherSpecificField {
+ t.Errorf("AnotherSpecificField mismatch: %v != %v", original.AnotherSpecificField, roundTrip.AnotherSpecificField)
+ }
+}
+
+func TestStructToMap_EmptyStruct(t *testing.T) {
+ config := &TestTaskConfig{}
+ result := StructToMap(config)
+
+ // Should still include all fields, even with zero values
+ expectedFields := []string{"enabled", "scan_interval_seconds", "max_concurrent", "task_specific_field", "another_specific_field"}
+
+ for _, field := range expectedFields {
+ if _, exists := result[field]; !exists {
+ t.Errorf("Missing field: %s", field)
+ }
+ }
+}
+
+func TestStructToMap_NilPointer(t *testing.T) {
+ var config *TestTaskConfig = nil
+ result := StructToMap(config)
+
+ if len(result) != 0 {
+ t.Errorf("Expected empty map for nil pointer, got %+v", result)
+ }
+}
+
+func TestMapToStruct_InvalidInput(t *testing.T) {
+ data := map[string]interface{}{
+ "enabled": "not_a_bool", // Wrong type
+ }
+
+ config := &TestTaskConfig{}
+ err := MapToStruct(data, config)
+
+ if err == nil {
+ t.Errorf("Expected error for invalid input type, but got none")
+ }
+}
+
+func TestMapToStruct_NonPointer(t *testing.T) {
+ data := map[string]interface{}{
+ "enabled": true,
+ }
+
+ config := TestTaskConfig{} // Not a pointer
+ err := MapToStruct(data, config)
+
+ if err == nil {
+ t.Errorf("Expected error for non-pointer input, but got none")
+ }
+}
+
+// Benchmark tests to ensure performance is reasonable
+func BenchmarkStructToMap(b *testing.B) {
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _ = StructToMap(config)
+ }
+}
+
+func BenchmarkMapToStruct(b *testing.B) {
+ data := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 1800,
+ "max_concurrent": 3,
+ "task_specific_field": 0.25,
+ "another_specific_field": "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ config := &TestTaskConfig{}
+ _ = MapToStruct(data, config)
+ }
+}
+
+func BenchmarkRoundTrip(b *testing.B) {
+ original := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ dataMap := StructToMap(original)
+ roundTrip := &TestTaskConfig{}
+ _ = MapToStruct(dataMap, roundTrip)
+ }
+}
diff --git a/weed/worker/tasks/base/typed_task.go b/weed/worker/tasks/base/typed_task.go
new file mode 100644
index 000000000..9d2839607
--- /dev/null
+++ b/weed/worker/tasks/base/typed_task.go
@@ -0,0 +1,218 @@
+package base
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// BaseTypedTask provides a base implementation for typed tasks with logger support
+type BaseTypedTask struct {
+ taskType types.TaskType
+ taskID string
+ progress float64
+ progressCallback func(float64)
+ cancelled bool
+ mutex sync.RWMutex
+
+ // Logger functionality
+ logger tasks.TaskLogger
+ loggerConfig types.TaskLoggerConfig
+}
+
+// NewBaseTypedTask creates a new base typed task
+func NewBaseTypedTask(taskType types.TaskType) *BaseTypedTask {
+ return &BaseTypedTask{
+ taskType: taskType,
+ progress: 0.0,
+ loggerConfig: types.TaskLoggerConfig{
+ BaseLogDir: "/data/task_logs",
+ MaxTasks: 100,
+ MaxLogSizeMB: 10,
+ EnableConsole: true,
+ },
+ }
+}
+
+// GetType returns the task type
+func (bt *BaseTypedTask) GetType() types.TaskType {
+ return bt.taskType
+}
+
+// IsCancellable returns whether the task can be cancelled
+func (bt *BaseTypedTask) IsCancellable() bool {
+ return true // Most tasks can be cancelled
+}
+
+// Cancel cancels the task
+func (bt *BaseTypedTask) Cancel() error {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.cancelled = true
+ return nil
+}
+
+// IsCancelled returns whether the task has been cancelled
+func (bt *BaseTypedTask) IsCancelled() bool {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.cancelled
+}
+
+// GetProgress returns the current progress (0-100)
+func (bt *BaseTypedTask) GetProgress() float64 {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.progress
+}
+
+// SetProgress sets the current progress and calls the callback if set
+func (bt *BaseTypedTask) SetProgress(progress float64) {
+ bt.mutex.Lock()
+ callback := bt.progressCallback
+ bt.progress = progress
+ bt.mutex.Unlock()
+
+ if callback != nil {
+ callback(progress)
+ }
+}
+
+// SetProgressCallback sets the progress callback function
+func (bt *BaseTypedTask) SetProgressCallback(callback func(float64)) {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.progressCallback = callback
+}
+
+// SetLoggerConfig sets the logger configuration for this task
+func (bt *BaseTypedTask) SetLoggerConfig(config types.TaskLoggerConfig) {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.loggerConfig = config
+}
+
+// convertToTasksLoggerConfig converts types.TaskLoggerConfig to tasks.TaskLoggerConfig
+func convertToTasksLoggerConfig(config types.TaskLoggerConfig) tasks.TaskLoggerConfig {
+ return tasks.TaskLoggerConfig{
+ BaseLogDir: config.BaseLogDir,
+ MaxTasks: config.MaxTasks,
+ MaxLogSizeMB: config.MaxLogSizeMB,
+ EnableConsole: config.EnableConsole,
+ }
+}
+
+// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
+func (bt *BaseTypedTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+
+ bt.taskID = taskID
+
+ // Convert the logger config to the tasks package type
+ tasksLoggerConfig := convertToTasksLoggerConfig(bt.loggerConfig)
+
+ logger, err := tasks.NewTaskLogger(taskID, bt.taskType, workerID, params, tasksLoggerConfig)
+ if err != nil {
+ return fmt.Errorf("failed to initialize task logger: %w", err)
+ }
+
+ bt.logger = logger
+ if bt.logger != nil {
+ bt.logger.Info("BaseTypedTask initialized for task %s (type: %s)", taskID, bt.taskType)
+ }
+
+ return nil
+}
+
+// GetTaskLogger returns the task logger (LoggerProvider interface)
+func (bt *BaseTypedTask) GetTaskLogger() types.TaskLogger {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.logger
+}
+
+// LogInfo logs an info message
+func (bt *BaseTypedTask) LogInfo(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Info(message, args...)
+ }
+}
+
+// LogWarning logs a warning message
+func (bt *BaseTypedTask) LogWarning(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Warning(message, args...)
+ }
+}
+
+// LogError logs an error message
+func (bt *BaseTypedTask) LogError(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Error(message, args...)
+ }
+}
+
+// LogDebug logs a debug message
+func (bt *BaseTypedTask) LogDebug(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Debug(message, args...)
+ }
+}
+
+// LogWithFields logs a message with structured fields
+func (bt *BaseTypedTask) LogWithFields(level string, message string, fields map[string]interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.LogWithFields(level, message, fields)
+ }
+}
+
+// ValidateTyped provides basic validation for typed parameters
+func (bt *BaseTypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
+ if params == nil {
+ return errors.New("task parameters cannot be nil")
+ }
+ if params.VolumeId == 0 {
+ return errors.New("volume_id is required")
+ }
+ if params.Server == "" {
+ return errors.New("server is required")
+ }
+ return nil
+}
+
+// EstimateTimeTyped provides a default time estimation
+func (bt *BaseTypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ // Default estimation - concrete tasks should override this
+ return 5 * time.Minute
+}
+
+// ExecuteTyped is a placeholder that concrete tasks must implement
+func (bt *BaseTypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
+ panic("ExecuteTyped must be implemented by concrete task types")
+}
diff --git a/weed/worker/tasks/config_update_registry.go b/weed/worker/tasks/config_update_registry.go
new file mode 100644
index 000000000..649c8b384
--- /dev/null
+++ b/weed/worker/tasks/config_update_registry.go
@@ -0,0 +1,67 @@
+package tasks
+
+import (
+ "sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// ConfigUpdateFunc is a function type for updating task configurations
+type ConfigUpdateFunc func(configPersistence interface{}) error
+
+// ConfigUpdateRegistry manages config update functions for all task types
+type ConfigUpdateRegistry struct {
+ updaters map[types.TaskType]ConfigUpdateFunc
+ mutex sync.RWMutex
+}
+
+var (
+ globalConfigUpdateRegistry *ConfigUpdateRegistry
+ configUpdateRegistryOnce sync.Once
+)
+
+// GetGlobalConfigUpdateRegistry returns the global config update registry (singleton)
+func GetGlobalConfigUpdateRegistry() *ConfigUpdateRegistry {
+ configUpdateRegistryOnce.Do(func() {
+ globalConfigUpdateRegistry = &ConfigUpdateRegistry{
+ updaters: make(map[types.TaskType]ConfigUpdateFunc),
+ }
+ glog.V(1).Infof("Created global config update registry")
+ })
+ return globalConfigUpdateRegistry
+}
+
+// RegisterConfigUpdater registers a config update function for a task type
+func (r *ConfigUpdateRegistry) RegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ r.updaters[taskType] = updateFunc
+ glog.V(1).Infof("Registered config updater for task type: %s", taskType)
+}
+
+// UpdateAllConfigs updates configurations for all registered task types
+func (r *ConfigUpdateRegistry) UpdateAllConfigs(configPersistence interface{}) {
+ r.mutex.RLock()
+ updaters := make(map[types.TaskType]ConfigUpdateFunc)
+ for k, v := range r.updaters {
+ updaters[k] = v
+ }
+ r.mutex.RUnlock()
+
+ for taskType, updateFunc := range updaters {
+ if err := updateFunc(configPersistence); err != nil {
+ glog.Warningf("Failed to load %s configuration from persistence: %v", taskType, err)
+ } else {
+ glog.V(1).Infof("Loaded %s configuration from persistence", taskType)
+ }
+ }
+
+ glog.V(1).Infof("All task configurations loaded from persistence")
+}
+
+// AutoRegisterConfigUpdater is a convenience function for registering config updaters
+func AutoRegisterConfigUpdater(taskType types.TaskType, updateFunc ConfigUpdateFunc) {
+ registry := GetGlobalConfigUpdateRegistry()
+ registry.RegisterConfigUpdater(taskType, updateFunc)
+}
diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go
new file mode 100644
index 000000000..1f70fb8db
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/config.go
@@ -0,0 +1,207 @@
+package erasure_coding
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with erasure coding specific settings
+type Config struct {
+ base.BaseConfig
+ QuietForSeconds int `json:"quiet_for_seconds"`
+ FullnessRatio float64 `json:"fullness_ratio"`
+ CollectionFilter string `json:"collection_filter"`
+ MinSizeMB int `json:"min_size_mb"`
+}
+
+// NewDefaultConfig creates a new default erasure coding configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 60 * 60, // 1 hour
+ MaxConcurrent: 1,
+ },
+ QuietForSeconds: 300, // 5 minutes
+ FullnessRatio: 0.8, // 80%
+ CollectionFilter: "",
+ MinSizeMB: 30, // 30MB (more reasonable than 100MB)
+ }
+}
+
+// GetConfigSpec returns the configuration schema for erasure coding tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Erasure Coding Tasks",
+ Description: "Whether erasure coding tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing erasure coding",
+ HelpText: "The system will check for volumes that need erasure coding at this interval",
+ Placeholder: "1",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 5,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of erasure coding tasks that can run simultaneously",
+ HelpText: "Limits the number of erasure coding operations running at the same time",
+ Placeholder: "1 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "quiet_for_seconds",
+ JSONName: "quiet_for_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 300,
+ MinValue: 60,
+ MaxValue: 3600,
+ Required: true,
+ DisplayName: "Quiet Period",
+ Description: "Minimum time volume must be quiet before erasure coding",
+ HelpText: "Volume must not be modified for this duration before erasure coding",
+ Placeholder: "5",
+ Unit: config.UnitMinutes,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "fullness_ratio",
+ JSONName: "fullness_ratio",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.8,
+ MinValue: 0.1,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Fullness Ratio",
+ Description: "Minimum fullness ratio to trigger erasure coding",
+ HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
+ Placeholder: "0.80 (80%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "collection_filter",
+ JSONName: "collection_filter",
+ Type: config.FieldTypeString,
+ DefaultValue: "",
+ Required: false,
+ DisplayName: "Collection Filter",
+ Description: "Only process volumes from specific collections",
+ HelpText: "Leave empty to process all collections, or specify collection name",
+ Placeholder: "my_collection",
+ InputType: "text",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_size_mb",
+ JSONName: "min_size_mb",
+ Type: config.FieldTypeInt,
+ DefaultValue: 30,
+ MinValue: 1,
+ MaxValue: 1000,
+ Required: true,
+ DisplayName: "Minimum Size (MB)",
+ Description: "Minimum volume size to consider for erasure coding",
+ HelpText: "Only volumes larger than this size will be considered for erasure coding",
+ Placeholder: "30",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
+
+// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
+func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: float64(c.FullnessRatio),
+ QuietForSeconds: int32(c.QuietForSeconds),
+ MinVolumeSizeMb: int32(c.MinSizeMB),
+ CollectionFilter: c.CollectionFilter,
+ },
+ },
+ }
+}
+
+// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
+func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+
+ // Set general TaskPolicy fields
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
+
+ // Set erasure coding-specific fields from the task config
+ if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
+ c.FullnessRatio = float64(ecConfig.FullnessRatio)
+ c.QuietForSeconds = int(ecConfig.QuietForSeconds)
+ c.MinSizeMB = int(ecConfig.MinVolumeSizeMb)
+ c.CollectionFilter = ecConfig.CollectionFilter
+ }
+
+ return nil
+}
+
+// LoadConfigFromPersistence loads configuration from the persistence layer if available
+func LoadConfigFromPersistence(configPersistence interface{}) *Config {
+ config := NewDefaultConfig()
+
+ // Try to load from persistence if available
+ if persistence, ok := configPersistence.(interface {
+ LoadErasureCodingTaskPolicy() (*worker_pb.TaskPolicy, error)
+ }); ok {
+ if policy, err := persistence.LoadErasureCodingTaskPolicy(); err == nil && policy != nil {
+ if err := config.FromTaskPolicy(policy); err == nil {
+ glog.V(1).Infof("Loaded erasure coding configuration from persistence")
+ return config
+ }
+ }
+ }
+
+ glog.V(1).Infof("Using default erasure coding configuration")
+ return config
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
new file mode 100644
index 000000000..1a2558396
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -0,0 +1,140 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for erasure coding tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ ecConfig := config.(*Config)
+ var results []*types.TaskDetectionResult
+ now := time.Now()
+ quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
+ minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum
+
+ debugCount := 0
+ skippedAlreadyEC := 0
+ skippedTooSmall := 0
+ skippedCollectionFilter := 0
+ skippedQuietTime := 0
+ skippedFullness := 0
+
+ for _, metric := range metrics {
+ // Skip if already EC volume
+ if metric.IsECVolume {
+ skippedAlreadyEC++
+ continue
+ }
+
+ // Check minimum size requirement
+ if metric.Size < minSizeBytes {
+ skippedTooSmall++
+ continue
+ }
+
+ // Check collection filter if specified
+ if ecConfig.CollectionFilter != "" {
+ // Parse comma-separated collections
+ allowedCollections := make(map[string]bool)
+ for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
+ allowedCollections[strings.TrimSpace(collection)] = true
+ }
+ // Skip if volume's collection is not in the allowed list
+ if !allowedCollections[metric.Collection] {
+ skippedCollectionFilter++
+ continue
+ }
+ }
+
+ // Check quiet duration and fullness criteria
+ if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: types.TaskPriorityLow, // EC is not urgent
+ Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
+ metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
+ float64(metric.Size)/(1024*1024)),
+ ScheduleAt: now,
+ }
+ results = append(results, result)
+ } else {
+ // Count debug reasons
+ if debugCount < 5 { // Limit to avoid spam
+ if metric.Age < quietThreshold {
+ skippedQuietTime++
+ }
+ if metric.FullnessRatio < ecConfig.FullnessRatio {
+ skippedFullness++
+ }
+ }
+ debugCount++
+ }
+ }
+
+ // Log debug summary if no tasks were created
+ if len(results) == 0 && len(metrics) > 0 {
+ totalVolumes := len(metrics)
+ glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
+ totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
+
+ // Show details for first few volumes
+ for i, metric := range metrics {
+ if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes
+ continue
+ }
+ sizeMB := float64(metric.Size) / (1024 * 1024)
+ glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
+ metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute),
+ metric.FullnessRatio*100, ecConfig.FullnessRatio*100)
+ }
+ }
+
+ return results, nil
+}
+
+// Scheduling implements the scheduling logic for erasure coding tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ ecConfig := config.(*Config)
+
+ // Check if we have available workers
+ if len(availableWorkers) == 0 {
+ return false
+ }
+
+ // Count running EC tasks
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeErasureCoding {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningCount >= ecConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if any worker can handle EC tasks
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeErasureCoding {
+ return true
+ }
+ }
+ }
+
+ return false
+}
diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go
index 641dfc6b5..8dc7a1cd0 100644
--- a/weed/worker/tasks/erasure_coding/ec.go
+++ b/weed/worker/tasks/erasure_coding/ec.go
@@ -1,79 +1,785 @@
package erasure_coding
import (
+ "context"
"fmt"
+ "io"
+ "math"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
)
-// Task implements erasure coding operation to convert volumes to EC format
+// Task implements comprehensive erasure coding with protobuf parameters
type Task struct {
- *tasks.BaseTask
- server string
- volumeID uint32
+ *base.BaseTypedTask
+
+ // Current task state
+ sourceServer string
+ volumeID uint32
+ collection string
+ workDir string
+ masterClient string
+ grpcDialOpt grpc.DialOption
+
+ // EC parameters from protobuf
+ destinations []*worker_pb.ECDestination // Disk-aware destinations
+ existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup
+ estimatedShardSize uint64
+ dataShards int
+ parityShards int
+ cleanupSource bool
+
+ // Progress tracking
+ currentStep string
+ stepProgress map[string]float64
}
-// NewTask creates a new erasure coding task instance
-func NewTask(server string, volumeID uint32) *Task {
+// NewTask creates a new erasure coding task
+func NewTask() types.TypedTaskInterface {
task := &Task{
- BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding),
- server: server,
- volumeID: volumeID,
+ BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding),
+ masterClient: "localhost:9333", // Default master client
+ workDir: "/tmp/seaweedfs_ec_work", // Default work directory
+ grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure
+ dataShards: erasure_coding.DataShardsCount, // Use package constant
+ parityShards: erasure_coding.ParityShardsCount, // Use package constant
+ stepProgress: make(map[string]float64),
}
return task
}
-// Execute executes the erasure coding task
-func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting erasure coding task for volume %d on server %s", t.volumeID, t.server)
+// ValidateTyped validates the typed parameters for EC task
+func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error {
+ // Basic validation from base class
+ if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
+ return err
+ }
+
+ // Check that we have EC-specific parameters
+ ecParams := params.GetErasureCodingParams()
+ if ecParams == nil {
+ return fmt.Errorf("erasure_coding_params is required for EC task")
+ }
+
+ // Require destinations
+ if len(ecParams.Destinations) == 0 {
+ return fmt.Errorf("destinations must be specified for EC task")
+ }
+
+ // DataShards and ParityShards are constants from erasure_coding package
+ expectedDataShards := int32(erasure_coding.DataShardsCount)
+ expectedParityShards := int32(erasure_coding.ParityShardsCount)
+
+ if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards {
+ return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards)
+ }
+ if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards {
+ return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards)
+ }
+
+ // Validate destination count
+ destinationCount := len(ecParams.Destinations)
+ totalShards := expectedDataShards + expectedParityShards
+ if totalShards > int32(destinationCount) {
+ return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount)
+ }
+
+ return nil
+}
+
+// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters
+func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations
+
+ ecParams := params.GetErasureCodingParams()
+ if ecParams != nil && ecParams.EstimatedShardSize > 0 {
+ // More accurate estimate based on shard size
+ // Account for copying, encoding, and distribution
+ gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024)
+ estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB
+ if estimatedTime > baseTime {
+ return estimatedTime
+ }
+ }
+
+ return baseTime
+}
+
+// ExecuteTyped implements the actual erasure coding workflow with typed parameters
+func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error {
+ // Extract basic parameters
+ t.volumeID = params.VolumeId
+ t.sourceServer = params.Server
+ t.collection = params.Collection
- // Simulate erasure coding operation with progress updates
- steps := []struct {
- name string
- duration time.Duration
- progress float64
- }{
- {"Analyzing volume", 2 * time.Second, 15},
- {"Creating EC shards", 5 * time.Second, 50},
- {"Verifying shards", 2 * time.Second, 75},
- {"Finalizing EC volume", 1 * time.Second, 100},
+ // Extract EC-specific parameters
+ ecParams := params.GetErasureCodingParams()
+ if ecParams != nil {
+ t.destinations = ecParams.Destinations // Store disk-aware destinations
+ t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup
+ t.estimatedShardSize = ecParams.EstimatedShardSize
+ t.cleanupSource = ecParams.CleanupSource
+
+ // DataShards and ParityShards are constants, don't override from parameters
+ // t.dataShards and t.parityShards are already set to constants in NewTask
+
+ if ecParams.WorkingDir != "" {
+ t.workDir = ecParams.WorkingDir
+ }
+ if ecParams.MasterClient != "" {
+ t.masterClient = ecParams.MasterClient
+ }
}
- for _, step := range steps {
- if t.IsCancelled() {
- return fmt.Errorf("erasure coding task cancelled")
+ // Determine available destinations for logging
+ var availableDestinations []string
+ for _, dest := range t.destinations {
+ availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId))
+ }
+
+ glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)",
+ t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards)
+
+ // Create unique working directory for this task
+ taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
+ if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
+ return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
+ }
+ glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir)
+
+ // Ensure cleanup of working directory
+ defer func() {
+ if err := os.RemoveAll(taskWorkDir); err != nil {
+ glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err)
+ } else {
+ glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir)
}
+ }()
+
+ // Step 1: Collect volume locations from master
+ glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master")
+ t.SetProgress(5.0)
+ volumeId := needle.VolumeId(t.volumeID)
+ volumeLocations, err := t.collectVolumeLocations(volumeId)
+ if err != nil {
+ return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations)
- glog.V(1).Infof("Erasure coding task step: %s", step.name)
- t.SetProgress(step.progress)
+ // Convert ServerAddress slice to string slice
+ var locationStrings []string
+ for _, addr := range volumeLocations {
+ locationStrings = append(locationStrings, string(addr))
+ }
- // Simulate work
- time.Sleep(step.duration)
+ // Step 2: Check if volume has sufficient size for EC encoding
+ if !t.shouldPerformECEncoding(locationStrings) {
+ glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID)
+ t.SetProgress(100.0)
+ return nil
}
- glog.Infof("Erasure coding task completed for volume %d on server %s", t.volumeID, t.server)
+ // Step 2A: Cleanup existing EC shards if any
+ glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID)
+ t.SetProgress(10.0)
+ err = t.cleanupExistingEcShards()
+ if err != nil {
+ glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err)
+ // Don't fail the task - this is just cleanup
+ }
+ glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID)
+
+ // Step 3: Mark volume readonly on all servers
+ glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID)
+ t.SetProgress(15.0)
+ err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings)
+ if err != nil {
+ return fmt.Errorf("failed to mark volume readonly: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID)
+
+ // Step 5: Copy volume files (.dat, .idx) to EC worker
+ glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer)
+ t.SetProgress(25.0)
+ localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to copy volume files to EC worker: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles)
+
+ // Step 6: Generate EC shards locally on EC worker
+ glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker")
+ t.SetProgress(40.0)
+ localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to generate EC shards locally: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles))
+
+ // Step 7: Distribute shards from EC worker to destination servers
+ glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers")
+ t.SetProgress(60.0)
+ err = t.distributeEcShardsFromWorker(localShardFiles)
+ if err != nil {
+ return fmt.Errorf("failed to distribute EC shards from worker: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers")
+
+ // Step 8: Mount EC shards on destination servers
+ glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers")
+ t.SetProgress(80.0)
+ err = t.mountEcShardsOnDestinations()
+ if err != nil {
+ return fmt.Errorf("failed to mount EC shards: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: EC shards mounted successfully")
+
+ // Step 9: Delete original volume from all locations
+ glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID)
+ t.SetProgress(90.0)
+ err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings)
+ if err != nil {
+ return fmt.Errorf("failed to delete original volume: %v", err)
+ }
+ glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID)
+
+ t.SetProgress(100.0)
+ glog.Infof("EC task completed successfully for volume %d", t.volumeID)
return nil
}
-// Validate validates the task parameters
-func (t *Task) Validate(params types.TaskParams) error {
- if params.VolumeID == 0 {
- return fmt.Errorf("volume_id is required")
+// collectVolumeLocations gets volume location from master (placeholder implementation)
+func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) {
+ // For now, return a placeholder implementation
+ // Full implementation would call master to get volume locations
+ return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil
+}
+
+// cleanupExistingEcShards deletes existing EC shards using planned locations
+func (t *Task) cleanupExistingEcShards() error {
+ if len(t.existingShardLocations) == 0 {
+ glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID)
+ return nil
}
- if params.Server == "" {
- return fmt.Errorf("server is required")
+
+ glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations))
+
+ // Delete existing shards from each location using planned shard locations
+ for _, location := range t.existingShardLocations {
+ if len(location.ShardIds) == 0 {
+ continue
+ }
+
+ glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ ShardIds: location.ShardIds,
+ })
+ return deleteErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err)
+ // Continue with other servers - don't fail the entire cleanup
+ } else {
+ glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
+ }
}
+
+ glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID)
return nil
}
-// EstimateTime estimates the time needed for the task
-func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
- // Base time for erasure coding operation
- baseTime := 30 * time.Second
+// shouldPerformECEncoding checks if the volume meets criteria for EC encoding
+func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool {
+ // For now, always proceed with EC encoding if volume exists
+ // This can be extended with volume size checks, etc.
+ return len(volumeLocations) > 0
+}
- // Could adjust based on volume size or other factors
- return baseTime
+// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers
+func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error {
+ glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations))
+
+ // Mark volume readonly on all replica servers
+ for _, location := range volumeLocations {
+ glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: uint32(volumeId),
+ })
+ return markErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err)
+ return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err)
+ }
+
+ glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location)
+ }
+
+ glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations))
+ return nil
+}
+
+// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
+func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
+ localFiles := make(map[string]string)
+
+ // Copy .dat file
+ datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
+ err := t.copyFileFromSource(".dat", datFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to copy .dat file: %v", err)
+ }
+ localFiles["dat"] = datFile
+ glog.V(1).Infof("Copied .dat file to: %s", datFile)
+
+ // Copy .idx file
+ idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
+ err = t.copyFileFromSource(".idx", idxFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to copy .idx file: %v", err)
+ }
+ localFiles["idx"] = idxFile
+ glog.V(1).Infof("Copied .idx file to: %s", idxFile)
+
+ return localFiles, nil
+}
+
+// copyFileFromSource copies a file from source server to local path using gRPC streaming
+func (t *Task) copyFileFromSource(ext, localPath string) error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ Ext: ext,
+ StopOffset: uint64(math.MaxInt64),
+ })
+ if err != nil {
+ return fmt.Errorf("failed to initiate file copy: %v", err)
+ }
+
+ // Create local file
+ localFile, err := os.Create(localPath)
+ if err != nil {
+ return fmt.Errorf("failed to create local file %s: %v", localPath, err)
+ }
+ defer localFile.Close()
+
+ // Stream data and write to local file
+ totalBytes := int64(0)
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("failed to receive file data: %v", err)
+ }
+
+ if len(resp.FileContent) > 0 {
+ written, writeErr := localFile.Write(resp.FileContent)
+ if writeErr != nil {
+ return fmt.Errorf("failed to write to local file: %v", writeErr)
+ }
+ totalBytes += int64(written)
+ }
+ }
+
+ glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath)
+ return nil
+ })
+}
+
+// generateEcShardsLocally generates EC shards from local volume files
+func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
+ datFile := localFiles["dat"]
+ idxFile := localFiles["idx"]
+
+ if datFile == "" || idxFile == "" {
+ return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
+ }
+
+ // Get base name without extension for EC operations
+ baseName := strings.TrimSuffix(datFile, ".dat")
+
+ shardFiles := make(map[string]string)
+
+ glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
+
+ // Generate EC shard files (.ec00 ~ .ec13)
+ if err := erasure_coding.WriteEcFiles(baseName); err != nil {
+ return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
+ }
+
+ // Generate .ecx file from .idx
+ if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil {
+ return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
+ }
+
+ // Collect generated shard file paths
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
+ if _, err := os.Stat(shardFile); err == nil {
+ shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
+ }
+ }
+
+ // Add metadata files
+ ecxFile := idxFile + ".ecx"
+ if _, err := os.Stat(ecxFile); err == nil {
+ shardFiles["ecx"] = ecxFile
+ }
+
+ // Generate .vif file (volume info)
+ vifFile := baseName + ".vif"
+ // Create basic volume info - in a real implementation, this would come from the original volume
+ volumeInfo := &volume_server_pb.VolumeInfo{
+ Version: uint32(needle.GetCurrentVersion()),
+ }
+ if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
+ glog.Warningf("Failed to create .vif file: %v", err)
+ } else {
+ shardFiles["vif"] = vifFile
+ }
+
+ glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
+ return shardFiles, nil
+}
+
+func (t *Task) copyEcShardsToDestinations() error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations))
+
+ // Prepare shard IDs (0-13 for EC shards)
+ var shardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardIds = append(shardIds, uint32(i))
+ }
+
+ // Distribute shards across destinations
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Track which disks have already received metadata files (server+disk)
+ metadataFilesCopied := make(map[string]bool)
+ var metadataMutex sync.Mutex
+
+ // For each destination, copy a subset of shards
+ shardsPerDest := len(shardIds) / len(destinations)
+ remainder := len(shardIds) % len(destinations)
+
+ shardOffset := 0
+ for i, dest := range destinations {
+ wg.Add(1)
+
+ shardsForThisDest := shardsPerDest
+ if i < remainder {
+ shardsForThisDest++ // Distribute remainder shards
+ }
+
+ destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
+ shardOffset += shardsForThisDest
+
+ go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard copy")
+ return
+ }
+
+ // Create disk-specific metadata key (server+disk)
+ diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
+
+ glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)",
+ targetShardIds, t.sourceServer, destination.Node, destination.DiskId)
+
+ // Check if this disk needs metadata files (only once per disk)
+ metadataMutex.Lock()
+ needsMetadataFiles := !metadataFilesCopied[diskKey]
+ if needsMetadataFiles {
+ metadataFilesCopied[diskKey] = true
+ }
+ metadataMutex.Unlock()
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(t.volumeID),
+ Collection: t.collection,
+ ShardIds: targetShardIds,
+ CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk
+ CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk
+ CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk
+ SourceDataNode: t.sourceServer,
+ DiskId: destination.DiskId, // Pass target disk ID
+ })
+ return copyErr
+ })
+
+ if err != nil {
+ errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
+ return
+ }
+
+ if needsMetadataFiles {
+ glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d",
+ targetShardIds, destination.Node, destination.DiskId)
+ } else {
+ glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)",
+ targetShardIds, destination.Node, destination.DiskId)
+ }
+ }(dest, destShardIds)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any copy errors
+ if err := <-errorChan; err != nil {
+ return err
+ }
+
+ glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers
+func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations))
+
+ // Prepare shard IDs (0-13 for EC shards)
+ var shardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardIds = append(shardIds, uint32(i))
+ }
+
+ // Distribute shards across destinations
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Track which disks have already received metadata files (server+disk)
+ metadataFilesCopied := make(map[string]bool)
+ var metadataMutex sync.Mutex
+
+ // For each destination, send a subset of shards
+ shardsPerDest := len(shardIds) / len(destinations)
+ remainder := len(shardIds) % len(destinations)
+
+ shardOffset := 0
+ for i, dest := range destinations {
+ wg.Add(1)
+
+ shardsForThisDest := shardsPerDest
+ if i < remainder {
+ shardsForThisDest++ // Distribute remainder shards
+ }
+
+ destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
+ shardOffset += shardsForThisDest
+
+ go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard distribution")
+ return
+ }
+
+ // Create disk-specific metadata key (server+disk)
+ diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
+
+ glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)",
+ targetShardIds, destination.Node, destination.DiskId)
+
+ // Check if this disk needs metadata files (only once per disk)
+ metadataMutex.Lock()
+ needsMetadataFiles := !metadataFilesCopied[diskKey]
+ if needsMetadataFiles {
+ metadataFilesCopied[diskKey] = true
+ }
+ metadataMutex.Unlock()
+
+ // Send shard files to destination using HTTP upload (simplified for now)
+ err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles)
+ if err != nil {
+ errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
+ return
+ }
+
+ if needsMetadataFiles {
+ glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d",
+ targetShardIds, destination.Node, destination.DiskId)
+ } else {
+ glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)",
+ targetShardIds, destination.Node, destination.DiskId)
+ }
+ }(dest, destShardIds)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any distribution errors
+ if err := <-errorChan; err != nil {
+ return err
+ }
+
+ glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// sendShardsToDestination sends specific shard files from worker to a destination server (simplified)
+func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error {
+ // For now, use a simplified approach - just upload the files
+ // In a full implementation, this would use proper file upload mechanisms
+ glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId)
+
+ // TODO: Implement actual file upload to volume server
+ // This is a placeholder - actual implementation would:
+ // 1. Open each shard file locally
+ // 2. Upload via HTTP POST or gRPC stream to destination volume server
+ // 3. Volume server would save to the specified disk_id
+
+ return nil
+}
+
+// mountEcShardsOnDestinations mounts EC shards on all destination servers
+func (t *Task) mountEcShardsOnDestinations() error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for mounting EC shards")
+ }
+
+ destinations := t.destinations
+
+ glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations))
+
+ // Prepare all shard IDs (0-13)
+ var allShardIds []uint32
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ allShardIds = append(allShardIds, uint32(i))
+ }
+
+ var wg sync.WaitGroup
+ errorChan := make(chan error, len(destinations))
+
+ // Mount shards on each destination server
+ for _, dest := range destinations {
+ wg.Add(1)
+
+ go func(destination *worker_pb.ECDestination) {
+ defer wg.Done()
+
+ if t.IsCancelled() {
+ errorChan <- fmt.Errorf("task cancelled during shard mounting")
+ return
+ }
+
+ glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: uint32(t.volumeID),
+ Collection: t.collection,
+ ShardIds: allShardIds, // Mount all available shards on each server
+ })
+ return mountErr
+ })
+
+ if err != nil {
+ // It's normal for some servers to not have all shards, so log as warning rather than error
+ glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err)
+ } else {
+ glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId)
+ }
+ }(dest)
+ }
+
+ wg.Wait()
+ close(errorChan)
+
+ // Check for any critical mounting errors
+ select {
+ case err := <-errorChan:
+ if err != nil {
+ glog.Warningf("Some shard mounting issues occurred: %v", err)
+ }
+ default:
+ // No errors
+ }
+
+ glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID)
+ return nil
+}
+
+// deleteVolumeFromAllLocations deletes the original volume from all replica servers
+func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error {
+ glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations))
+
+ for _, location := range volumeLocations {
+ glog.V(1).Infof("Deleting volume %d from %s", volumeId, location)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
+ VolumeId: uint32(volumeId),
+ OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards
+ })
+ return deleteErr
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err)
+ return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err)
+ }
+
+ glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location)
+ }
+
+ glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations))
+ return nil
+}
+
+// Register the task in the global registry
+func init() {
+ types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask)
+ glog.V(1).Infof("Registered EC task")
}
diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go
deleted file mode 100644
index 0f8b5e376..000000000
--- a/weed/worker/tasks/erasure_coding/ec_detector.go
+++ /dev/null
@@ -1,139 +0,0 @@
-package erasure_coding
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// EcDetector implements erasure coding task detection
-type EcDetector struct {
- enabled bool
- volumeAgeHours int
- fullnessRatio float64
- scanInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*EcDetector)(nil)
-)
-
-// NewEcDetector creates a new erasure coding detector
-func NewEcDetector() *EcDetector {
- return &EcDetector{
- enabled: false, // Conservative default
- volumeAgeHours: 24 * 7, // 1 week
- fullnessRatio: 0.9, // 90% full
- scanInterval: 2 * time.Hour,
- }
-}
-
-// GetTaskType returns the task type
-func (d *EcDetector) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// ScanForTasks scans for volumes that should be converted to erasure coding
-func (d *EcDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- var results []*types.TaskDetectionResult
- now := time.Now()
- ageThreshold := time.Duration(d.volumeAgeHours) * time.Hour
-
- for _, metric := range volumeMetrics {
- // Skip if already EC volume
- if metric.IsECVolume {
- continue
- }
-
- // Check age and fullness criteria
- if metric.Age >= ageThreshold && metric.FullnessRatio >= d.fullnessRatio {
- // Check if volume is read-only (safe for EC conversion)
- if !metric.IsReadOnly {
- continue
- }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeErasureCoding,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: types.TaskPriorityLow, // EC is not urgent
- Reason: "Volume is old and full enough for EC conversion",
- Parameters: map[string]interface{}{
- "age_hours": int(metric.Age.Hours()),
- "fullness_ratio": metric.FullnessRatio,
- },
- ScheduleAt: now,
- }
- results = append(results, result)
- }
- }
-
- glog.V(2).Infof("EC detector found %d tasks to schedule", len(results))
- return results, nil
-}
-
-// ScanInterval returns how often this task type should be scanned
-func (d *EcDetector) ScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// IsEnabled returns whether this task type is enabled
-func (d *EcDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// Configuration setters
-
-func (d *EcDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
-}
-
-func (d *EcDetector) SetVolumeAgeHours(hours int) {
- d.volumeAgeHours = hours
-}
-
-func (d *EcDetector) SetFullnessRatio(ratio float64) {
- d.fullnessRatio = ratio
-}
-
-func (d *EcDetector) SetScanInterval(interval time.Duration) {
- d.scanInterval = interval
-}
-
-// GetVolumeAgeHours returns the current volume age threshold in hours
-func (d *EcDetector) GetVolumeAgeHours() int {
- return d.volumeAgeHours
-}
-
-// GetFullnessRatio returns the current fullness ratio threshold
-func (d *EcDetector) GetFullnessRatio() float64 {
- return d.fullnessRatio
-}
-
-// GetScanInterval returns the scan interval
-func (d *EcDetector) GetScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// ConfigureFromPolicy configures the detector based on the maintenance policy
-func (d *EcDetector) ConfigureFromPolicy(policy interface{}) {
- // Type assert to the maintenance policy type we expect
- if maintenancePolicy, ok := policy.(interface {
- GetECEnabled() bool
- GetECVolumeAgeHours() int
- GetECFullnessRatio() float64
- }); ok {
- d.SetEnabled(maintenancePolicy.GetECEnabled())
- d.SetVolumeAgeHours(maintenancePolicy.GetECVolumeAgeHours())
- d.SetFullnessRatio(maintenancePolicy.GetECFullnessRatio())
- } else {
- glog.V(1).Infof("Could not configure EC detector from policy: unsupported policy type")
- }
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go
index 6c4b5bf7f..62cfe6b56 100644
--- a/weed/worker/tasks/erasure_coding/ec_register.go
+++ b/weed/worker/tasks/erasure_coding/ec_register.go
@@ -2,80 +2,71 @@ package erasure_coding
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates erasure coding task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new erasure coding task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeErasureCoding,
- []string{"erasure_coding", "storage", "durability"},
- "Convert volumes to erasure coded format for improved durability",
- ),
- }
-}
-
-// Create creates a new erasure coding task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterErasureCodingTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *EcDetector
- sharedScheduler *Scheduler
-)
+// RegisterErasureCodingTask registers the erasure coding task with the new architecture
+func RegisterErasureCodingTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*EcDetector, *Scheduler) {
- if sharedDetector == nil {
- sharedDetector = NewEcDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeErasureCoding,
+ Name: "erasure_coding",
+ DisplayName: "Erasure Coding",
+ Description: "Applies erasure coding to volumes for data protection",
+ Icon: "fas fa-shield-alt text-success",
+ Capabilities: []string{"erasure_coding", "data_protection"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: nil, // Uses typed task system - see init() in ec.go
+ DetectionFunc: Detection,
+ ScanInterval: 1 * time.Hour,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 24 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*EcDetector, *Scheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeErasureCoding, factory)
+// UpdateConfigFromPersistence updates the erasure coding configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("erasure coding task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated erasure coding task configuration from persistence")
+ return nil
}
diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go
deleted file mode 100644
index b2366bb06..000000000
--- a/weed/worker/tasks/erasure_coding/ec_scheduler.go
+++ /dev/null
@@ -1,114 +0,0 @@
-package erasure_coding
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// Scheduler implements erasure coding task scheduling
-type Scheduler struct {
- maxConcurrent int
- enabled bool
-}
-
-// NewScheduler creates a new erasure coding scheduler
-func NewScheduler() *Scheduler {
- return &Scheduler{
- maxConcurrent: 1, // Conservative default
- enabled: false, // Conservative default
- }
-}
-
-// GetTaskType returns the task type
-func (s *Scheduler) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// CanScheduleNow determines if an erasure coding task can be scheduled now
-func (s *Scheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- if !s.enabled {
- return false
- }
-
- // Check if we have available workers
- if len(availableWorkers) == 0 {
- return false
- }
-
- // Count running EC tasks
- runningCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeErasureCoding {
- runningCount++
- }
- }
-
- // Check concurrency limit
- if runningCount >= s.maxConcurrent {
- glog.V(3).Infof("EC scheduler: at concurrency limit (%d/%d)", runningCount, s.maxConcurrent)
- return false
- }
-
- // Check if any worker can handle EC tasks
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeErasureCoding {
- glog.V(3).Infof("EC scheduler: can schedule task for volume %d", task.VolumeID)
- return true
- }
- }
- }
-
- return false
-}
-
-// GetMaxConcurrent returns the maximum number of concurrent tasks
-func (s *Scheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating EC tasks
-func (s *Scheduler) GetDefaultRepeatInterval() time.Duration {
- return 24 * time.Hour // Don't repeat EC for 24 hours
-}
-
-// GetPriority returns the priority for this task
-func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority {
- return types.TaskPriorityLow // EC is not urgent
-}
-
-// WasTaskRecentlyCompleted checks if a similar task was recently completed
-func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool {
- // Don't repeat EC for 24 hours
- interval := 24 * time.Hour
- cutoff := now.Add(-interval)
-
- for _, completedTask := range completedTasks {
- if completedTask.Type == types.TaskTypeErasureCoding &&
- completedTask.VolumeID == task.VolumeID &&
- completedTask.Server == task.Server &&
- completedTask.Status == types.TaskStatusCompleted &&
- completedTask.CompletedAt != nil &&
- completedTask.CompletedAt.After(cutoff) {
- return true
- }
- }
- return false
-}
-
-// IsEnabled returns whether this task type is enabled
-func (s *Scheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// Configuration setters
-
-func (s *Scheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
-}
-
-func (s *Scheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
-}
diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go
deleted file mode 100644
index e17cba89a..000000000
--- a/weed/worker/tasks/erasure_coding/ui.go
+++ /dev/null
@@ -1,309 +0,0 @@
-package erasure_coding
-
-import (
- "fmt"
- "html/template"
- "strconv"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// UIProvider provides the UI for erasure coding task configuration
-type UIProvider struct {
- detector *EcDetector
- scheduler *Scheduler
-}
-
-// NewUIProvider creates a new erasure coding UI provider
-func NewUIProvider(detector *EcDetector, scheduler *Scheduler) *UIProvider {
- return &UIProvider{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetTaskType returns the task type
-func (ui *UIProvider) GetTaskType() types.TaskType {
- return types.TaskTypeErasureCoding
-}
-
-// GetDisplayName returns the human-readable name
-func (ui *UIProvider) GetDisplayName() string {
- return "Erasure Coding"
-}
-
-// GetDescription returns a description of what this task does
-func (ui *UIProvider) GetDescription() string {
- return "Converts volumes to erasure coded format for improved data durability and fault tolerance"
-}
-
-// GetIcon returns the icon CSS class for this task type
-func (ui *UIProvider) GetIcon() string {
- return "fas fa-shield-alt text-info"
-}
-
-// ErasureCodingConfig represents the erasure coding configuration
-type ErasureCodingConfig struct {
- Enabled bool `json:"enabled"`
- VolumeAgeHoursSeconds int `json:"volume_age_hours_seconds"`
- FullnessRatio float64 `json:"fullness_ratio"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- ShardCount int `json:"shard_count"`
- ParityCount int `json:"parity_count"`
- CollectionFilter string `json:"collection_filter"`
-}
-
-// Helper functions for duration conversion
-func secondsToDuration(seconds int) time.Duration {
- return time.Duration(seconds) * time.Second
-}
-
-func durationToSeconds(d time.Duration) int {
- return int(d.Seconds())
-}
-
-// formatDurationForUser formats seconds as a user-friendly duration string
-func formatDurationForUser(seconds int) string {
- d := secondsToDuration(seconds)
- if d < time.Minute {
- return fmt.Sprintf("%ds", seconds)
- }
- if d < time.Hour {
- return fmt.Sprintf("%.0fm", d.Minutes())
- }
- if d < 24*time.Hour {
- return fmt.Sprintf("%.1fh", d.Hours())
- }
- return fmt.Sprintf("%.1fd", d.Hours()/24)
-}
-
-// RenderConfigForm renders the configuration form HTML
-func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
- config := ui.getCurrentECConfig()
-
- // Build form using the FormBuilder helper
- form := types.NewFormBuilder()
-
- // Detection Settings
- form.AddCheckboxField(
- "enabled",
- "Enable Erasure Coding Tasks",
- "Whether erasure coding tasks should be automatically created",
- config.Enabled,
- )
-
- form.AddNumberField(
- "volume_age_hours_seconds",
- "Volume Age Threshold",
- "Only apply erasure coding to volumes older than this duration",
- float64(config.VolumeAgeHoursSeconds),
- true,
- )
-
- form.AddNumberField(
- "scan_interval_seconds",
- "Scan Interval",
- "How often to scan for volumes needing erasure coding",
- float64(config.ScanIntervalSeconds),
- true,
- )
-
- // Scheduling Settings
- form.AddNumberField(
- "max_concurrent",
- "Max Concurrent Tasks",
- "Maximum number of erasure coding tasks that can run simultaneously",
- float64(config.MaxConcurrent),
- true,
- )
-
- // Erasure Coding Parameters
- form.AddNumberField(
- "shard_count",
- "Data Shards",
- "Number of data shards for erasure coding (recommended: 10)",
- float64(config.ShardCount),
- true,
- )
-
- form.AddNumberField(
- "parity_count",
- "Parity Shards",
- "Number of parity shards for erasure coding (recommended: 4)",
- float64(config.ParityCount),
- true,
- )
-
- // Generate organized form sections using Bootstrap components
- html := `
-<div class="row">
- <div class="col-12">
- <div class="card mb-4">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-shield-alt me-2"></i>
- Erasure Coding Configuration
- </h5>
- </div>
- <div class="card-body">
-` + string(form.Build()) + `
- </div>
- </div>
- </div>
-</div>
-
-<div class="row">
- <div class="col-12">
- <div class="card mb-3">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-info-circle me-2"></i>
- Performance Impact
- </h5>
- </div>
- <div class="card-body">
- <div class="alert alert-info" role="alert">
- <h6 class="alert-heading">Important Notes:</h6>
- <p class="mb-2"><strong>Performance:</strong> Erasure coding is CPU and I/O intensive. Consider running during off-peak hours.</p>
- <p class="mb-0"><strong>Durability:</strong> With ` + fmt.Sprintf("%d+%d", config.ShardCount, config.ParityCount) + ` configuration, can tolerate up to ` + fmt.Sprintf("%d", config.ParityCount) + ` shard failures.</p>
- </div>
- </div>
- </div>
- </div>
-</div>`
-
- return template.HTML(html), nil
-}
-
-// ParseConfigForm parses form data into configuration
-func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
- config := &ErasureCodingConfig{}
-
- // Parse enabled
- config.Enabled = len(formData["enabled"]) > 0
-
- // Parse volume age hours
- if values, ok := formData["volume_age_hours_seconds"]; ok && len(values) > 0 {
- hours, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid volume age hours: %w", err)
- }
- config.VolumeAgeHoursSeconds = hours
- }
-
- // Parse scan interval
- if values, ok := formData["scan_interval_seconds"]; ok && len(values) > 0 {
- interval, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid scan interval: %w", err)
- }
- config.ScanIntervalSeconds = interval
- }
-
- // Parse max concurrent
- if values, ok := formData["max_concurrent"]; ok && len(values) > 0 {
- maxConcurrent, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid max concurrent: %w", err)
- }
- if maxConcurrent < 1 {
- return nil, fmt.Errorf("max concurrent must be at least 1")
- }
- config.MaxConcurrent = maxConcurrent
- }
-
- // Parse shard count
- if values, ok := formData["shard_count"]; ok && len(values) > 0 {
- shardCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid shard count: %w", err)
- }
- if shardCount < 1 {
- return nil, fmt.Errorf("shard count must be at least 1")
- }
- config.ShardCount = shardCount
- }
-
- // Parse parity count
- if values, ok := formData["parity_count"]; ok && len(values) > 0 {
- parityCount, err := strconv.Atoi(values[0])
- if err != nil {
- return nil, fmt.Errorf("invalid parity count: %w", err)
- }
- if parityCount < 1 {
- return nil, fmt.Errorf("parity count must be at least 1")
- }
- config.ParityCount = parityCount
- }
-
- return config, nil
-}
-
-// GetCurrentConfig returns the current configuration
-func (ui *UIProvider) GetCurrentConfig() interface{} {
- return ui.getCurrentECConfig()
-}
-
-// ApplyConfig applies the new configuration
-func (ui *UIProvider) ApplyConfig(config interface{}) error {
- ecConfig, ok := config.(ErasureCodingConfig)
- if !ok {
- return fmt.Errorf("invalid config type, expected ErasureCodingConfig")
- }
-
- // Apply to detector
- if ui.detector != nil {
- ui.detector.SetEnabled(ecConfig.Enabled)
- ui.detector.SetVolumeAgeHours(ecConfig.VolumeAgeHoursSeconds)
- ui.detector.SetScanInterval(secondsToDuration(ecConfig.ScanIntervalSeconds))
- }
-
- // Apply to scheduler
- if ui.scheduler != nil {
- ui.scheduler.SetEnabled(ecConfig.Enabled)
- ui.scheduler.SetMaxConcurrent(ecConfig.MaxConcurrent)
- }
-
- glog.V(1).Infof("Applied erasure coding configuration: enabled=%v, age_threshold=%v, max_concurrent=%d, shards=%d+%d",
- ecConfig.Enabled, ecConfig.VolumeAgeHoursSeconds, ecConfig.MaxConcurrent, ecConfig.ShardCount, ecConfig.ParityCount)
-
- return nil
-}
-
-// getCurrentECConfig gets the current configuration from detector and scheduler
-func (ui *UIProvider) getCurrentECConfig() ErasureCodingConfig {
- config := ErasureCodingConfig{
- // Default values (fallback if detectors/schedulers are nil)
- Enabled: true,
- VolumeAgeHoursSeconds: 24 * 3600, // 24 hours in seconds
- ScanIntervalSeconds: 2 * 3600, // 2 hours in seconds
- MaxConcurrent: 1,
- ShardCount: 10,
- ParityCount: 4,
- }
-
- // Get current values from detector
- if ui.detector != nil {
- config.Enabled = ui.detector.IsEnabled()
- config.VolumeAgeHoursSeconds = ui.detector.GetVolumeAgeHours()
- config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval())
- }
-
- // Get current values from scheduler
- if ui.scheduler != nil {
- config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
- }
-
- return config
-}
-
-// RegisterUI registers the erasure coding UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *EcDetector, scheduler *Scheduler) {
- uiProvider := NewUIProvider(detector, scheduler)
- uiRegistry.RegisterUI(uiProvider)
-
- glog.V(1).Infof("✅ Registered erasure coding task UI provider")
-}
diff --git a/weed/worker/tasks/schema_provider.go b/weed/worker/tasks/schema_provider.go
new file mode 100644
index 000000000..4d69556b1
--- /dev/null
+++ b/weed/worker/tasks/schema_provider.go
@@ -0,0 +1,51 @@
+package tasks
+
+import (
+ "sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+)
+
+// TaskConfigSchema defines the schema for task configuration
+type TaskConfigSchema struct {
+ config.Schema // Embed common schema functionality
+ TaskName string `json:"task_name"`
+ DisplayName string `json:"display_name"`
+ Description string `json:"description"`
+ Icon string `json:"icon"`
+}
+
+// TaskConfigSchemaProvider is an interface for providing task configuration schemas
+type TaskConfigSchemaProvider interface {
+ GetConfigSchema() *TaskConfigSchema
+}
+
+// schemaRegistry maintains a registry of schema providers by task type
+type schemaRegistry struct {
+ providers map[string]TaskConfigSchemaProvider
+ mutex sync.RWMutex
+}
+
+var globalSchemaRegistry = &schemaRegistry{
+ providers: make(map[string]TaskConfigSchemaProvider),
+}
+
+// RegisterTaskConfigSchema registers a schema provider for a task type
+func RegisterTaskConfigSchema(taskType string, provider TaskConfigSchemaProvider) {
+ globalSchemaRegistry.mutex.Lock()
+ defer globalSchemaRegistry.mutex.Unlock()
+ globalSchemaRegistry.providers[taskType] = provider
+}
+
+// GetTaskConfigSchema returns the schema for the specified task type
+func GetTaskConfigSchema(taskType string) *TaskConfigSchema {
+ globalSchemaRegistry.mutex.RLock()
+ provider, exists := globalSchemaRegistry.providers[taskType]
+ globalSchemaRegistry.mutex.RUnlock()
+
+ if !exists {
+ return nil
+ }
+
+ return provider.GetConfigSchema()
+}
diff --git a/weed/worker/tasks/task.go b/weed/worker/tasks/task.go
index 482233f60..15369c137 100644
--- a/weed/worker/tasks/task.go
+++ b/weed/worker/tasks/task.go
@@ -2,29 +2,69 @@ package tasks
import (
"context"
+ "fmt"
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// BaseTask provides common functionality for all tasks
type BaseTask struct {
taskType types.TaskType
+ taskID string
progress float64
cancelled bool
mutex sync.RWMutex
startTime time.Time
estimatedDuration time.Duration
+ logger TaskLogger
+ loggerConfig TaskLoggerConfig
+ progressCallback func(float64) // Callback function for progress updates
}
// NewBaseTask creates a new base task
func NewBaseTask(taskType types.TaskType) *BaseTask {
return &BaseTask{
- taskType: taskType,
- progress: 0.0,
- cancelled: false,
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: DefaultTaskLoggerConfig(),
+ }
+}
+
+// NewBaseTaskWithLogger creates a new base task with custom logger configuration
+func NewBaseTaskWithLogger(taskType types.TaskType, loggerConfig TaskLoggerConfig) *BaseTask {
+ return &BaseTask{
+ taskType: taskType,
+ progress: 0.0,
+ cancelled: false,
+ loggerConfig: loggerConfig,
+ }
+}
+
+// InitializeLogger initializes the task logger with task details
+func (t *BaseTask) InitializeLogger(taskID string, workerID string, params types.TaskParams) error {
+ return t.InitializeTaskLogger(taskID, workerID, params)
+}
+
+// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
+func (t *BaseTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+
+ t.taskID = taskID
+
+ logger, err := NewTaskLogger(taskID, t.taskType, workerID, params, t.loggerConfig)
+ if err != nil {
+ return fmt.Errorf("failed to initialize task logger: %w", err)
}
+
+ t.logger = logger
+ t.logger.Info("BaseTask initialized for task %s (type: %s)", taskID, t.taskType)
+
+ return nil
}
// Type returns the task type
@@ -39,24 +79,47 @@ func (t *BaseTask) GetProgress() float64 {
return t.progress
}
-// SetProgress sets the current progress
+// SetProgress sets the current progress and logs it
func (t *BaseTask) SetProgress(progress float64) {
t.mutex.Lock()
- defer t.mutex.Unlock()
if progress < 0 {
progress = 0
}
if progress > 100 {
progress = 100
}
+ oldProgress := t.progress
+ callback := t.progressCallback
t.progress = progress
+ t.mutex.Unlock()
+
+ // Log progress change
+ if t.logger != nil && progress != oldProgress {
+ t.logger.LogProgress(progress, fmt.Sprintf("Progress updated from %.1f%% to %.1f%%", oldProgress, progress))
+ }
+
+ // Call progress callback if set
+ if callback != nil && progress != oldProgress {
+ callback(progress)
+ }
}
// Cancel cancels the task
func (t *BaseTask) Cancel() error {
t.mutex.Lock()
defer t.mutex.Unlock()
+
+ if t.cancelled {
+ return nil
+ }
+
t.cancelled = true
+
+ if t.logger != nil {
+ t.logger.LogStatus("cancelled", "Task cancelled by request")
+ t.logger.Warning("Task %s was cancelled", t.taskID)
+ }
+
return nil
}
@@ -72,6 +135,10 @@ func (t *BaseTask) SetStartTime(startTime time.Time) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.startTime = startTime
+
+ if t.logger != nil {
+ t.logger.LogStatus("running", fmt.Sprintf("Task started at %s", startTime.Format(time.RFC3339)))
+ }
}
// GetStartTime returns the task start time
@@ -86,6 +153,13 @@ func (t *BaseTask) SetEstimatedDuration(duration time.Duration) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.estimatedDuration = duration
+
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Estimated duration set", map[string]interface{}{
+ "estimated_duration": duration.String(),
+ "estimated_seconds": duration.Seconds(),
+ })
+ }
}
// GetEstimatedDuration returns the estimated duration
@@ -95,11 +169,115 @@ func (t *BaseTask) GetEstimatedDuration() time.Duration {
return t.estimatedDuration
}
-// ExecuteTask is a wrapper that handles common task execution logic
+// SetProgressCallback sets the progress callback function
+func (t *BaseTask) SetProgressCallback(callback func(float64)) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.progressCallback = callback
+}
+
+// SetLoggerConfig sets the logger configuration for this task
+func (t *BaseTask) SetLoggerConfig(config TaskLoggerConfig) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.loggerConfig = config
+}
+
+// GetLogger returns the task logger
+func (t *BaseTask) GetLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// GetTaskLogger returns the task logger (LoggerProvider interface)
+func (t *BaseTask) GetTaskLogger() TaskLogger {
+ t.mutex.RLock()
+ defer t.mutex.RUnlock()
+ return t.logger
+}
+
+// LogInfo logs an info message
+func (t *BaseTask) LogInfo(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Info(message, args...)
+ }
+}
+
+// LogWarning logs a warning message
+func (t *BaseTask) LogWarning(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Warning(message, args...)
+ }
+}
+
+// LogError logs an error message
+func (t *BaseTask) LogError(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Error(message, args...)
+ }
+}
+
+// LogDebug logs a debug message
+func (t *BaseTask) LogDebug(message string, args ...interface{}) {
+ if t.logger != nil {
+ t.logger.Debug(message, args...)
+ }
+}
+
+// LogWithFields logs a message with structured fields
+func (t *BaseTask) LogWithFields(level string, message string, fields map[string]interface{}) {
+ if t.logger != nil {
+ t.logger.LogWithFields(level, message, fields)
+ }
+}
+
+// FinishTask finalizes the task and closes the logger
+func (t *BaseTask) FinishTask(success bool, errorMsg string) error {
+ if t.logger != nil {
+ if success {
+ t.logger.LogStatus("completed", "Task completed successfully")
+ t.logger.Info("Task %s finished successfully", t.taskID)
+ } else {
+ t.logger.LogStatus("failed", fmt.Sprintf("Task failed: %s", errorMsg))
+ t.logger.Error("Task %s failed: %s", t.taskID, errorMsg)
+ }
+
+ // Close logger
+ if err := t.logger.Close(); err != nil {
+ glog.Errorf("Failed to close task logger: %v", err)
+ }
+ }
+
+ return nil
+}
+
+// ExecuteTask is a wrapper that handles common task execution logic with logging
func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, executor func(context.Context, types.TaskParams) error) error {
+ // Initialize logger if not already done
+ if t.logger == nil {
+ // Generate a temporary task ID if none provided
+ if t.taskID == "" {
+ t.taskID = fmt.Sprintf("task_%d", time.Now().UnixNano())
+ }
+
+ workerID := "unknown"
+ if err := t.InitializeLogger(t.taskID, workerID, params); err != nil {
+ glog.Warningf("Failed to initialize task logger: %v", err)
+ }
+ }
+
t.SetStartTime(time.Now())
t.SetProgress(0)
+ if t.logger != nil {
+ t.logger.LogWithFields("INFO", "Task execution started", map[string]interface{}{
+ "volume_id": params.VolumeID,
+ "server": params.Server,
+ "collection": params.Collection,
+ })
+ }
+
// Create a context that can be cancelled
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -114,21 +292,29 @@ func (t *BaseTask) ExecuteTask(ctx context.Context, params types.TaskParams, exe
// Check cancellation every second
}
}
+ t.LogWarning("Task cancellation detected, cancelling context")
cancel()
}()
// Execute the actual task
+ t.LogInfo("Starting task executor")
err := executor(ctx, params)
if err != nil {
+ t.LogError("Task executor failed: %v", err)
+ t.FinishTask(false, err.Error())
return err
}
if t.IsCancelled() {
+ t.LogWarning("Task was cancelled during execution")
+ t.FinishTask(false, "cancelled")
return context.Canceled
}
t.SetProgress(100)
+ t.LogInfo("Task executor completed successfully")
+ t.FinishTask(true, "")
return nil
}
diff --git a/weed/worker/tasks/task_log_handler.go b/weed/worker/tasks/task_log_handler.go
new file mode 100644
index 000000000..be5f00f12
--- /dev/null
+++ b/weed/worker/tasks/task_log_handler.go
@@ -0,0 +1,230 @@
+package tasks
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+)
+
+// TaskLogHandler handles task log requests from admin server
+type TaskLogHandler struct {
+ baseLogDir string
+}
+
+// NewTaskLogHandler creates a new task log handler
+func NewTaskLogHandler(baseLogDir string) *TaskLogHandler {
+ if baseLogDir == "" {
+ baseLogDir = "/tmp/seaweedfs/task_logs"
+ }
+ return &TaskLogHandler{
+ baseLogDir: baseLogDir,
+ }
+}
+
+// HandleLogRequest processes a task log request and returns the response
+func (h *TaskLogHandler) HandleLogRequest(request *worker_pb.TaskLogRequest) *worker_pb.TaskLogResponse {
+ response := &worker_pb.TaskLogResponse{
+ TaskId: request.TaskId,
+ WorkerId: request.WorkerId,
+ Success: false,
+ }
+
+ // Find the task log directory
+ logDir, err := h.findTaskLogDirectory(request.TaskId)
+ if err != nil {
+ response.ErrorMessage = fmt.Sprintf("Task log directory not found: %v", err)
+ glog.Warningf("Task log request failed for %s: %v", request.TaskId, err)
+ return response
+ }
+
+ // Read metadata if requested
+ if request.IncludeMetadata {
+ metadata, err := h.readTaskMetadata(logDir)
+ if err != nil {
+ response.ErrorMessage = fmt.Sprintf("Failed to read task metadata: %v", err)
+ glog.Warningf("Failed to read metadata for task %s: %v", request.TaskId, err)
+ return response
+ }
+ response.Metadata = metadata
+ }
+
+ // Read log entries
+ logEntries, err := h.readTaskLogEntries(logDir, request)
+ if err != nil {
+ response.ErrorMessage = fmt.Sprintf("Failed to read task logs: %v", err)
+ glog.Warningf("Failed to read logs for task %s: %v", request.TaskId, err)
+ return response
+ }
+
+ response.LogEntries = logEntries
+ response.Success = true
+
+ glog.V(1).Infof("Successfully retrieved %d log entries for task %s", len(logEntries), request.TaskId)
+ return response
+}
+
+// findTaskLogDirectory searches for the task log directory by task ID
+func (h *TaskLogHandler) findTaskLogDirectory(taskID string) (string, error) {
+ entries, err := os.ReadDir(h.baseLogDir)
+ if err != nil {
+ return "", fmt.Errorf("failed to read base log directory: %w", err)
+ }
+
+ // Look for directories that start with the task ID
+ for _, entry := range entries {
+ if entry.IsDir() && strings.HasPrefix(entry.Name(), taskID+"_") {
+ return filepath.Join(h.baseLogDir, entry.Name()), nil
+ }
+ }
+
+ return "", fmt.Errorf("task log directory not found for task ID: %s", taskID)
+}
+
+// readTaskMetadata reads task metadata from the log directory
+func (h *TaskLogHandler) readTaskMetadata(logDir string) (*worker_pb.TaskLogMetadata, error) {
+ metadata, err := GetTaskLogMetadata(logDir)
+ if err != nil {
+ return nil, err
+ }
+
+ // Convert to protobuf metadata
+ pbMetadata := &worker_pb.TaskLogMetadata{
+ TaskId: metadata.TaskID,
+ TaskType: metadata.TaskType,
+ WorkerId: metadata.WorkerID,
+ StartTime: metadata.StartTime.Unix(),
+ Status: metadata.Status,
+ Progress: float32(metadata.Progress),
+ VolumeId: metadata.VolumeID,
+ Server: metadata.Server,
+ Collection: metadata.Collection,
+ LogFilePath: metadata.LogFilePath,
+ CreatedAt: metadata.CreatedAt.Unix(),
+ CustomData: make(map[string]string),
+ }
+
+ // Set end time and duration if available
+ if metadata.EndTime != nil {
+ pbMetadata.EndTime = metadata.EndTime.Unix()
+ }
+ if metadata.Duration != nil {
+ pbMetadata.DurationMs = metadata.Duration.Milliseconds()
+ }
+
+ // Convert custom data
+ for key, value := range metadata.CustomData {
+ if strValue, ok := value.(string); ok {
+ pbMetadata.CustomData[key] = strValue
+ } else {
+ pbMetadata.CustomData[key] = fmt.Sprintf("%v", value)
+ }
+ }
+
+ return pbMetadata, nil
+}
+
+// readTaskLogEntries reads and filters log entries based on the request
+func (h *TaskLogHandler) readTaskLogEntries(logDir string, request *worker_pb.TaskLogRequest) ([]*worker_pb.TaskLogEntry, error) {
+ entries, err := ReadTaskLogs(logDir)
+ if err != nil {
+ return nil, err
+ }
+
+ // Apply filters
+ var filteredEntries []TaskLogEntry
+
+ for _, entry := range entries {
+ // Filter by log level
+ if request.LogLevel != "" && !strings.EqualFold(entry.Level, request.LogLevel) {
+ continue
+ }
+
+ // Filter by time range
+ if request.StartTime > 0 && entry.Timestamp.Unix() < request.StartTime {
+ continue
+ }
+ if request.EndTime > 0 && entry.Timestamp.Unix() > request.EndTime {
+ continue
+ }
+
+ filteredEntries = append(filteredEntries, entry)
+ }
+
+ // Limit entries if requested
+ if request.MaxEntries > 0 && len(filteredEntries) > int(request.MaxEntries) {
+ // Take the most recent entries
+ start := len(filteredEntries) - int(request.MaxEntries)
+ filteredEntries = filteredEntries[start:]
+ }
+
+ // Convert to protobuf entries
+ var pbEntries []*worker_pb.TaskLogEntry
+ for _, entry := range filteredEntries {
+ pbEntry := &worker_pb.TaskLogEntry{
+ Timestamp: entry.Timestamp.Unix(),
+ Level: entry.Level,
+ Message: entry.Message,
+ Fields: make(map[string]string),
+ }
+
+ // Set progress if available
+ if entry.Progress != nil {
+ pbEntry.Progress = float32(*entry.Progress)
+ }
+
+ // Set status if available
+ if entry.Status != nil {
+ pbEntry.Status = *entry.Status
+ }
+
+ // Convert fields
+ for key, value := range entry.Fields {
+ if strValue, ok := value.(string); ok {
+ pbEntry.Fields[key] = strValue
+ } else {
+ pbEntry.Fields[key] = fmt.Sprintf("%v", value)
+ }
+ }
+
+ pbEntries = append(pbEntries, pbEntry)
+ }
+
+ return pbEntries, nil
+}
+
+// ListAvailableTaskLogs returns a list of available task log directories
+func (h *TaskLogHandler) ListAvailableTaskLogs() ([]string, error) {
+ entries, err := os.ReadDir(h.baseLogDir)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read base log directory: %w", err)
+ }
+
+ var taskDirs []string
+ for _, entry := range entries {
+ if entry.IsDir() {
+ taskDirs = append(taskDirs, entry.Name())
+ }
+ }
+
+ return taskDirs, nil
+}
+
+// CleanupOldLogs removes old task logs beyond the specified limit
+func (h *TaskLogHandler) CleanupOldLogs(maxTasks int) error {
+ config := TaskLoggerConfig{
+ BaseLogDir: h.baseLogDir,
+ MaxTasks: maxTasks,
+ }
+
+ // Create a temporary logger to trigger cleanup
+ tempLogger := &FileTaskLogger{
+ config: config,
+ }
+
+ tempLogger.cleanupOldLogs()
+ return nil
+}
diff --git a/weed/worker/tasks/task_logger.go b/weed/worker/tasks/task_logger.go
new file mode 100644
index 000000000..e9c06c35c
--- /dev/null
+++ b/weed/worker/tasks/task_logger.go
@@ -0,0 +1,432 @@
+package tasks
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TaskLogger provides file-based logging for individual tasks
+type TaskLogger interface {
+ // Log methods
+ Info(message string, args ...interface{})
+ Warning(message string, args ...interface{})
+ Error(message string, args ...interface{})
+ Debug(message string, args ...interface{})
+
+ // Progress and status logging
+ LogProgress(progress float64, message string)
+ LogStatus(status string, message string)
+
+ // Structured logging
+ LogWithFields(level string, message string, fields map[string]interface{})
+
+ // Lifecycle
+ Close() error
+ GetLogDir() string
+}
+
+// LoggerProvider interface for tasks that support logging
+type LoggerProvider interface {
+ InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error
+ GetTaskLogger() TaskLogger
+}
+
+// TaskLoggerConfig holds configuration for task logging
+type TaskLoggerConfig struct {
+ BaseLogDir string
+ MaxTasks int // Maximum number of task logs to keep
+ MaxLogSizeMB int // Maximum log file size in MB
+ EnableConsole bool // Also log to console
+}
+
+// FileTaskLogger implements TaskLogger using file-based logging
+type FileTaskLogger struct {
+ taskID string
+ taskType types.TaskType
+ workerID string
+ logDir string
+ logFile *os.File
+ mutex sync.Mutex
+ config TaskLoggerConfig
+ metadata *TaskLogMetadata
+ closed bool
+}
+
+// TaskLogMetadata contains metadata about the task execution
+type TaskLogMetadata struct {
+ TaskID string `json:"task_id"`
+ TaskType string `json:"task_type"`
+ WorkerID string `json:"worker_id"`
+ StartTime time.Time `json:"start_time"`
+ EndTime *time.Time `json:"end_time,omitempty"`
+ Duration *time.Duration `json:"duration,omitempty"`
+ Status string `json:"status"`
+ Progress float64 `json:"progress"`
+ VolumeID uint32 `json:"volume_id,omitempty"`
+ Server string `json:"server,omitempty"`
+ Collection string `json:"collection,omitempty"`
+ CustomData map[string]interface{} `json:"custom_data,omitempty"`
+ LogFilePath string `json:"log_file_path"`
+ CreatedAt time.Time `json:"created_at"`
+}
+
+// TaskLogEntry represents a single log entry
+type TaskLogEntry struct {
+ Timestamp time.Time `json:"timestamp"`
+ Level string `json:"level"`
+ Message string `json:"message"`
+ Fields map[string]interface{} `json:"fields,omitempty"`
+ Progress *float64 `json:"progress,omitempty"`
+ Status *string `json:"status,omitempty"`
+}
+
+// DefaultTaskLoggerConfig returns default configuration
+func DefaultTaskLoggerConfig() TaskLoggerConfig {
+ return TaskLoggerConfig{
+ BaseLogDir: "/data/task_logs", // Use persistent data directory
+ MaxTasks: 100, // Keep last 100 task logs
+ MaxLogSizeMB: 10,
+ EnableConsole: true,
+ }
+}
+
+// NewTaskLogger creates a new file-based task logger
+func NewTaskLogger(taskID string, taskType types.TaskType, workerID string, params types.TaskParams, config TaskLoggerConfig) (TaskLogger, error) {
+ // Create unique directory name with timestamp
+ timestamp := time.Now().Format("20060102_150405")
+ dirName := fmt.Sprintf("%s_%s_%s_%s", taskID, taskType, workerID, timestamp)
+ logDir := filepath.Join(config.BaseLogDir, dirName)
+
+ // Create log directory
+ if err := os.MkdirAll(logDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create log directory %s: %w", logDir, err)
+ }
+
+ // Create log file
+ logFilePath := filepath.Join(logDir, "task.log")
+ logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create log file %s: %w", logFilePath, err)
+ }
+
+ // Create metadata
+ metadata := &TaskLogMetadata{
+ TaskID: taskID,
+ TaskType: string(taskType),
+ WorkerID: workerID,
+ StartTime: time.Now(),
+ Status: "started",
+ Progress: 0.0,
+ VolumeID: params.VolumeID,
+ Server: params.Server,
+ Collection: params.Collection,
+ CustomData: make(map[string]interface{}),
+ LogFilePath: logFilePath,
+ CreatedAt: time.Now(),
+ }
+
+ logger := &FileTaskLogger{
+ taskID: taskID,
+ taskType: taskType,
+ workerID: workerID,
+ logDir: logDir,
+ logFile: logFile,
+ config: config,
+ metadata: metadata,
+ closed: false,
+ }
+
+ // Write initial log entry
+ logger.Info("Task logger initialized for %s (type: %s, worker: %s)", taskID, taskType, workerID)
+ logger.LogWithFields("INFO", "Task parameters", map[string]interface{}{
+ "volume_id": params.VolumeID,
+ "server": params.Server,
+ "collection": params.Collection,
+ })
+
+ // Save initial metadata
+ if err := logger.saveMetadata(); err != nil {
+ glog.Warningf("Failed to save initial task metadata: %v", err)
+ }
+
+ // Clean up old task logs
+ go logger.cleanupOldLogs()
+
+ return logger, nil
+}
+
+// Info logs an info message
+func (l *FileTaskLogger) Info(message string, args ...interface{}) {
+ l.log("INFO", message, args...)
+}
+
+// Warning logs a warning message
+func (l *FileTaskLogger) Warning(message string, args ...interface{}) {
+ l.log("WARNING", message, args...)
+}
+
+// Error logs an error message
+func (l *FileTaskLogger) Error(message string, args ...interface{}) {
+ l.log("ERROR", message, args...)
+}
+
+// Debug logs a debug message
+func (l *FileTaskLogger) Debug(message string, args ...interface{}) {
+ l.log("DEBUG", message, args...)
+}
+
+// LogProgress logs task progress
+func (l *FileTaskLogger) LogProgress(progress float64, message string) {
+ l.mutex.Lock()
+ l.metadata.Progress = progress
+ l.mutex.Unlock()
+
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: "INFO",
+ Message: message,
+ Progress: &progress,
+ }
+
+ l.writeLogEntry(entry)
+ l.saveMetadata() // Update metadata with new progress
+}
+
+// LogStatus logs task status change
+func (l *FileTaskLogger) LogStatus(status string, message string) {
+ l.mutex.Lock()
+ l.metadata.Status = status
+ l.mutex.Unlock()
+
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: "INFO",
+ Message: message,
+ Status: &status,
+ }
+
+ l.writeLogEntry(entry)
+ l.saveMetadata() // Update metadata with new status
+}
+
+// LogWithFields logs a message with structured fields
+func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[string]interface{}) {
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: level,
+ Message: message,
+ Fields: fields,
+ }
+
+ l.writeLogEntry(entry)
+}
+
+// Close closes the logger and finalizes metadata
+func (l *FileTaskLogger) Close() error {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+
+ if l.closed {
+ return nil
+ }
+
+ // Finalize metadata
+ endTime := time.Now()
+ duration := endTime.Sub(l.metadata.StartTime)
+ l.metadata.EndTime = &endTime
+ l.metadata.Duration = &duration
+
+ if l.metadata.Status == "started" {
+ l.metadata.Status = "completed"
+ }
+
+ // Save final metadata
+ l.saveMetadata()
+
+ // Close log file
+ if l.logFile != nil {
+ if err := l.logFile.Close(); err != nil {
+ return fmt.Errorf("failed to close log file: %w", err)
+ }
+ }
+
+ l.closed = true
+ l.Info("Task logger closed for %s", l.taskID)
+
+ return nil
+}
+
+// GetLogDir returns the log directory path
+func (l *FileTaskLogger) GetLogDir() string {
+ return l.logDir
+}
+
+// log is the internal logging method
+func (l *FileTaskLogger) log(level string, message string, args ...interface{}) {
+ formattedMessage := fmt.Sprintf(message, args...)
+
+ entry := TaskLogEntry{
+ Timestamp: time.Now(),
+ Level: level,
+ Message: formattedMessage,
+ }
+
+ l.writeLogEntry(entry)
+}
+
+// writeLogEntry writes a log entry to the file
+func (l *FileTaskLogger) writeLogEntry(entry TaskLogEntry) {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+
+ if l.closed || l.logFile == nil {
+ return
+ }
+
+ // Format as JSON line
+ jsonData, err := json.Marshal(entry)
+ if err != nil {
+ glog.Errorf("Failed to marshal log entry: %v", err)
+ return
+ }
+
+ // Write to file
+ if _, err := l.logFile.WriteString(string(jsonData) + "\n"); err != nil {
+ glog.Errorf("Failed to write log entry: %v", err)
+ return
+ }
+
+ // Flush to disk
+ if err := l.logFile.Sync(); err != nil {
+ glog.Errorf("Failed to sync log file: %v", err)
+ }
+
+ // Also log to console and stderr if enabled
+ if l.config.EnableConsole {
+ // Log to glog with proper call depth to show actual source location
+ // We need depth 3 to skip: writeLogEntry -> log -> Info/Warning/Error calls to reach the original caller
+ formattedMsg := fmt.Sprintf("[TASK-%s] %s: %s", l.taskID, entry.Level, entry.Message)
+ switch entry.Level {
+ case "ERROR":
+ glog.ErrorDepth(3, formattedMsg)
+ case "WARNING":
+ glog.WarningDepth(3, formattedMsg)
+ default: // INFO, DEBUG, etc.
+ glog.InfoDepth(3, formattedMsg)
+ }
+ // Also log to stderr for immediate visibility
+ fmt.Fprintf(os.Stderr, "[TASK-%s] %s: %s\n", l.taskID, entry.Level, entry.Message)
+ }
+}
+
+// saveMetadata saves task metadata to file
+func (l *FileTaskLogger) saveMetadata() error {
+ metadataPath := filepath.Join(l.logDir, "metadata.json")
+
+ data, err := json.MarshalIndent(l.metadata, "", " ")
+ if err != nil {
+ return fmt.Errorf("failed to marshal metadata: %w", err)
+ }
+
+ return os.WriteFile(metadataPath, data, 0644)
+}
+
+// cleanupOldLogs removes old task log directories to maintain the limit
+func (l *FileTaskLogger) cleanupOldLogs() {
+ baseDir := l.config.BaseLogDir
+
+ entries, err := os.ReadDir(baseDir)
+ if err != nil {
+ glog.Warningf("Failed to read log directory %s: %v", baseDir, err)
+ return
+ }
+
+ // Filter for directories only
+ var dirs []os.DirEntry
+ for _, entry := range entries {
+ if entry.IsDir() {
+ dirs = append(dirs, entry)
+ }
+ }
+
+ // If we're under the limit, nothing to clean
+ if len(dirs) <= l.config.MaxTasks {
+ return
+ }
+
+ // Sort by modification time (oldest first)
+ sort.Slice(dirs, func(i, j int) bool {
+ infoI, errI := dirs[i].Info()
+ infoJ, errJ := dirs[j].Info()
+ if errI != nil || errJ != nil {
+ return false
+ }
+ return infoI.ModTime().Before(infoJ.ModTime())
+ })
+
+ // Remove oldest directories
+ numToRemove := len(dirs) - l.config.MaxTasks
+ for i := 0; i < numToRemove; i++ {
+ dirPath := filepath.Join(baseDir, dirs[i].Name())
+ if err := os.RemoveAll(dirPath); err != nil {
+ glog.Warningf("Failed to remove old log directory %s: %v", dirPath, err)
+ } else {
+ glog.V(1).Infof("Cleaned up old task log directory: %s", dirPath)
+ }
+ }
+
+ glog.V(1).Infof("Task log cleanup completed: removed %d old directories", numToRemove)
+}
+
+// GetTaskLogMetadata reads metadata from a task log directory
+func GetTaskLogMetadata(logDir string) (*TaskLogMetadata, error) {
+ metadataPath := filepath.Join(logDir, "metadata.json")
+
+ data, err := os.ReadFile(metadataPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read metadata file: %w", err)
+ }
+
+ var metadata TaskLogMetadata
+ if err := json.Unmarshal(data, &metadata); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
+ }
+
+ return &metadata, nil
+}
+
+// ReadTaskLogs reads all log entries from a task log file
+func ReadTaskLogs(logDir string) ([]TaskLogEntry, error) {
+ logPath := filepath.Join(logDir, "task.log")
+
+ file, err := os.Open(logPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open log file: %w", err)
+ }
+ defer file.Close()
+
+ var entries []TaskLogEntry
+ decoder := json.NewDecoder(file)
+
+ for {
+ var entry TaskLogEntry
+ if err := decoder.Decode(&entry); err != nil {
+ if err == io.EOF {
+ break
+ }
+ return nil, fmt.Errorf("failed to decode log entry: %w", err)
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, nil
+}
diff --git a/weed/worker/tasks/ui_base.go b/weed/worker/tasks/ui_base.go
new file mode 100644
index 000000000..ac22c20c4
--- /dev/null
+++ b/weed/worker/tasks/ui_base.go
@@ -0,0 +1,184 @@
+package tasks
+
+import (
+ "reflect"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// BaseUIProvider provides common UIProvider functionality for all tasks
+type BaseUIProvider struct {
+ taskType types.TaskType
+ displayName string
+ description string
+ icon string
+ schemaFunc func() *TaskConfigSchema
+ configFunc func() types.TaskConfig
+ applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error
+ applyTaskConfigFunc func(config types.TaskConfig) error
+}
+
+// NewBaseUIProvider creates a new base UI provider
+func NewBaseUIProvider(
+ taskType types.TaskType,
+ displayName string,
+ description string,
+ icon string,
+ schemaFunc func() *TaskConfigSchema,
+ configFunc func() types.TaskConfig,
+ applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error,
+ applyTaskConfigFunc func(config types.TaskConfig) error,
+) *BaseUIProvider {
+ return &BaseUIProvider{
+ taskType: taskType,
+ displayName: displayName,
+ description: description,
+ icon: icon,
+ schemaFunc: schemaFunc,
+ configFunc: configFunc,
+ applyTaskPolicyFunc: applyTaskPolicyFunc,
+ applyTaskConfigFunc: applyTaskConfigFunc,
+ }
+}
+
+// GetTaskType returns the task type
+func (ui *BaseUIProvider) GetTaskType() types.TaskType {
+ return ui.taskType
+}
+
+// GetDisplayName returns the human-readable name
+func (ui *BaseUIProvider) GetDisplayName() string {
+ return ui.displayName
+}
+
+// GetDescription returns a description of what this task does
+func (ui *BaseUIProvider) GetDescription() string {
+ return ui.description
+}
+
+// GetIcon returns the icon CSS class for this task type
+func (ui *BaseUIProvider) GetIcon() string {
+ return ui.icon
+}
+
+// GetCurrentConfig returns the current configuration as TaskConfig
+func (ui *BaseUIProvider) GetCurrentConfig() types.TaskConfig {
+ return ui.configFunc()
+}
+
+// ApplyTaskPolicy applies protobuf TaskPolicy configuration
+func (ui *BaseUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return ui.applyTaskPolicyFunc(policy)
+}
+
+// ApplyTaskConfig applies TaskConfig interface configuration
+func (ui *BaseUIProvider) ApplyTaskConfig(config types.TaskConfig) error {
+ return ui.applyTaskConfigFunc(config)
+}
+
+// CommonConfigGetter provides a common pattern for getting current configuration
+type CommonConfigGetter[T any] struct {
+ defaultConfig T
+ detectorFunc func() T
+ schedulerFunc func() T
+}
+
+// NewCommonConfigGetter creates a new common config getter
+func NewCommonConfigGetter[T any](
+ defaultConfig T,
+ detectorFunc func() T,
+ schedulerFunc func() T,
+) *CommonConfigGetter[T] {
+ return &CommonConfigGetter[T]{
+ defaultConfig: defaultConfig,
+ detectorFunc: detectorFunc,
+ schedulerFunc: schedulerFunc,
+ }
+}
+
+// GetConfig returns the merged configuration
+func (cg *CommonConfigGetter[T]) GetConfig() T {
+ config := cg.defaultConfig
+
+ // Apply detector values if available
+ if cg.detectorFunc != nil {
+ detectorConfig := cg.detectorFunc()
+ mergeConfigs(&config, detectorConfig)
+ }
+
+ // Apply scheduler values if available
+ if cg.schedulerFunc != nil {
+ schedulerConfig := cg.schedulerFunc()
+ mergeConfigs(&config, schedulerConfig)
+ }
+
+ return config
+}
+
+// mergeConfigs merges non-zero values from source into dest
+func mergeConfigs[T any](dest *T, source T) {
+ destValue := reflect.ValueOf(dest).Elem()
+ sourceValue := reflect.ValueOf(source)
+
+ if destValue.Kind() != reflect.Struct || sourceValue.Kind() != reflect.Struct {
+ return
+ }
+
+ for i := 0; i < destValue.NumField(); i++ {
+ destField := destValue.Field(i)
+ sourceField := sourceValue.Field(i)
+
+ if !destField.CanSet() {
+ continue
+ }
+
+ // Only copy non-zero values
+ if !sourceField.IsZero() {
+ if destField.Type() == sourceField.Type() {
+ destField.Set(sourceField)
+ }
+ }
+ }
+}
+
+// RegisterUIFunc provides a common registration function signature
+type RegisterUIFunc[D, S any] func(uiRegistry *types.UIRegistry, detector D, scheduler S)
+
+// CommonRegisterUI provides a common registration implementation
+func CommonRegisterUI[D, S any](
+ taskType types.TaskType,
+ displayName string,
+ uiRegistry *types.UIRegistry,
+ detector D,
+ scheduler S,
+ schemaFunc func() *TaskConfigSchema,
+ configFunc func() types.TaskConfig,
+ applyTaskPolicyFunc func(policy *worker_pb.TaskPolicy) error,
+ applyTaskConfigFunc func(config types.TaskConfig) error,
+) {
+ // Get metadata from schema
+ schema := schemaFunc()
+ description := "Task configuration"
+ icon := "fas fa-cog"
+
+ if schema != nil {
+ description = schema.Description
+ icon = schema.Icon
+ }
+
+ uiProvider := NewBaseUIProvider(
+ taskType,
+ displayName,
+ description,
+ icon,
+ schemaFunc,
+ configFunc,
+ applyTaskPolicyFunc,
+ applyTaskConfigFunc,
+ )
+
+ uiRegistry.RegisterUI(uiProvider)
+ glog.V(1).Infof("✅ Registered %s task UI provider", taskType)
+}
diff --git a/weed/worker/tasks/vacuum/config.go b/weed/worker/tasks/vacuum/config.go
new file mode 100644
index 000000000..fe8c0e8c5
--- /dev/null
+++ b/weed/worker/tasks/vacuum/config.go
@@ -0,0 +1,190 @@
+package vacuum
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with vacuum-specific settings
+type Config struct {
+ base.BaseConfig
+ GarbageThreshold float64 `json:"garbage_threshold"`
+ MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
+ MinIntervalSeconds int `json:"min_interval_seconds"`
+}
+
+// NewDefaultConfig creates a new default vacuum configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
+ MaxConcurrent: 2,
+ },
+ GarbageThreshold: 0.3, // 30%
+ MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }
+}
+
+// ToTaskPolicy converts configuration to a TaskPolicy protobuf message
+func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: float64(c.GarbageThreshold),
+ MinVolumeAgeHours: int32(c.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
+ MinIntervalSeconds: int32(c.MinIntervalSeconds),
+ },
+ },
+ }
+}
+
+// FromTaskPolicy loads configuration from a TaskPolicy protobuf message
+func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+
+ // Set general TaskPolicy fields
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) // Direct seconds-to-seconds mapping
+
+ // Set vacuum-specific fields from the task config
+ if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil {
+ c.GarbageThreshold = float64(vacuumConfig.GarbageThreshold)
+ c.MinVolumeAgeSeconds = int(vacuumConfig.MinVolumeAgeHours * 3600) // Convert hours to seconds
+ c.MinIntervalSeconds = int(vacuumConfig.MinIntervalSeconds)
+ }
+
+ return nil
+}
+
+// LoadConfigFromPersistence loads configuration from the persistence layer if available
+func LoadConfigFromPersistence(configPersistence interface{}) *Config {
+ config := NewDefaultConfig()
+
+ // Try to load from persistence if available
+ if persistence, ok := configPersistence.(interface {
+ LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, error)
+ }); ok {
+ if policy, err := persistence.LoadVacuumTaskPolicy(); err == nil && policy != nil {
+ if err := config.FromTaskPolicy(policy); err == nil {
+ glog.V(1).Infof("Loaded vacuum configuration from persistence")
+ return config
+ }
+ }
+ }
+
+ glog.V(1).Infof("Using default vacuum configuration")
+ return config
+}
+
+// GetConfigSpec returns the configuration schema for vacuum tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Vacuum Tasks",
+ Description: "Whether vacuum tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic vacuum task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 2 * 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing vacuum",
+ HelpText: "The system will check for volumes that need vacuuming at this interval",
+ Placeholder: "2",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 1,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of vacuum tasks that can run simultaneously",
+ HelpText: "Limits the number of vacuum operations running at the same time to control system load",
+ Placeholder: "2 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "garbage_threshold",
+ JSONName: "garbage_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.3,
+ MinValue: 0.0,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Garbage Percentage Threshold",
+ Description: "Trigger vacuum when garbage ratio exceeds this percentage",
+ HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
+ Placeholder: "0.30 (30%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_volume_age_seconds",
+ JSONName: "min_volume_age_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 24 * 60 * 60,
+ MinValue: 1 * 60 * 60,
+ MaxValue: 7 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Volume Age",
+ Description: "Only vacuum volumes older than this duration",
+ HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
+ Placeholder: "24",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_interval_seconds",
+ JSONName: "min_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 7 * 24 * 60 * 60,
+ MinValue: 1 * 24 * 60 * 60,
+ MaxValue: 30 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Interval",
+ Description: "Minimum time between vacuum operations on the same volume",
+ HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
+ Placeholder: "7",
+ Unit: config.UnitDays,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go
new file mode 100644
index 000000000..7b5a1baf0
--- /dev/null
+++ b/weed/worker/tasks/vacuum/detection.go
@@ -0,0 +1,112 @@
+package vacuum
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for vacuum tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ vacuumConfig := config.(*Config)
+ var results []*types.TaskDetectionResult
+ minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
+
+ debugCount := 0
+ skippedDueToGarbage := 0
+ skippedDueToAge := 0
+
+ for _, metric := range metrics {
+ // Check if volume needs vacuum
+ if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
+ priority := types.TaskPriorityNormal
+ if metric.GarbageRatio > 0.6 {
+ priority = types.TaskPriorityHigh
+ }
+
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeVacuum,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: priority,
+ Reason: "Volume has excessive garbage requiring vacuum",
+ ScheduleAt: time.Now(),
+ }
+ results = append(results, result)
+ } else {
+ // Debug why volume was not selected
+ if debugCount < 5 { // Limit debug output to first 5 volumes
+ if metric.GarbageRatio < vacuumConfig.GarbageThreshold {
+ skippedDueToGarbage++
+ }
+ if metric.Age < minVolumeAge {
+ skippedDueToAge++
+ }
+ }
+ debugCount++
+ }
+ }
+
+ // Log debug summary if no tasks were created
+ if len(results) == 0 && len(metrics) > 0 {
+ totalVolumes := len(metrics)
+ glog.Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage<threshold), %d (age<minimum)",
+ totalVolumes, vacuumConfig.GarbageThreshold*100, minVolumeAge, skippedDueToGarbage, skippedDueToAge)
+
+ // Show details for first few volumes
+ for i, metric := range metrics {
+ if i >= 3 { // Limit to first 3 volumes
+ break
+ }
+ glog.Infof("VACUUM: Volume %d: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)",
+ metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100,
+ metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute))
+ }
+ }
+
+ return results, nil
+}
+
+// Scheduling implements the scheduling logic for vacuum tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ vacuumConfig := config.(*Config)
+
+ // Count running vacuum tasks
+ runningVacuumCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeVacuum {
+ runningVacuumCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningVacuumCount >= vacuumConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check for available workers with vacuum capability
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeVacuum {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// CreateTask creates a new vacuum task instance
+func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Create and return the vacuum task using existing Task type
+ return NewTask(params.Server, params.VolumeID), nil
+}
diff --git a/weed/worker/tasks/vacuum/ui.go b/weed/worker/tasks/vacuum/ui.go
deleted file mode 100644
index 6f67a801a..000000000
--- a/weed/worker/tasks/vacuum/ui.go
+++ /dev/null
@@ -1,314 +0,0 @@
-package vacuum
-
-import (
- "fmt"
- "html/template"
- "strconv"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// UIProvider provides the UI for vacuum task configuration
-type UIProvider struct {
- detector *VacuumDetector
- scheduler *VacuumScheduler
-}
-
-// NewUIProvider creates a new vacuum UI provider
-func NewUIProvider(detector *VacuumDetector, scheduler *VacuumScheduler) *UIProvider {
- return &UIProvider{
- detector: detector,
- scheduler: scheduler,
- }
-}
-
-// GetTaskType returns the task type
-func (ui *UIProvider) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// GetDisplayName returns the human-readable name
-func (ui *UIProvider) GetDisplayName() string {
- return "Volume Vacuum"
-}
-
-// GetDescription returns a description of what this task does
-func (ui *UIProvider) GetDescription() string {
- return "Reclaims disk space by removing deleted files from volumes"
-}
-
-// GetIcon returns the icon CSS class for this task type
-func (ui *UIProvider) GetIcon() string {
- return "fas fa-broom text-primary"
-}
-
-// VacuumConfig represents the vacuum configuration
-type VacuumConfig struct {
- Enabled bool `json:"enabled"`
- GarbageThreshold float64 `json:"garbage_threshold"`
- ScanIntervalSeconds int `json:"scan_interval_seconds"`
- MaxConcurrent int `json:"max_concurrent"`
- MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
- MinIntervalSeconds int `json:"min_interval_seconds"`
-}
-
-// Helper functions for duration conversion
-func secondsToDuration(seconds int) time.Duration {
- return time.Duration(seconds) * time.Second
-}
-
-func durationToSeconds(d time.Duration) int {
- return int(d.Seconds())
-}
-
-// formatDurationForUser formats seconds as a user-friendly duration string
-func formatDurationForUser(seconds int) string {
- d := secondsToDuration(seconds)
- if d < time.Minute {
- return fmt.Sprintf("%ds", seconds)
- }
- if d < time.Hour {
- return fmt.Sprintf("%.0fm", d.Minutes())
- }
- if d < 24*time.Hour {
- return fmt.Sprintf("%.1fh", d.Hours())
- }
- return fmt.Sprintf("%.1fd", d.Hours()/24)
-}
-
-// RenderConfigForm renders the configuration form HTML
-func (ui *UIProvider) RenderConfigForm(currentConfig interface{}) (template.HTML, error) {
- config := ui.getCurrentVacuumConfig()
-
- // Build form using the FormBuilder helper
- form := types.NewFormBuilder()
-
- // Detection Settings
- form.AddCheckboxField(
- "enabled",
- "Enable Vacuum Tasks",
- "Whether vacuum tasks should be automatically created",
- config.Enabled,
- )
-
- form.AddNumberField(
- "garbage_threshold",
- "Garbage Threshold (%)",
- "Trigger vacuum when garbage ratio exceeds this percentage (0.0-1.0)",
- config.GarbageThreshold,
- true,
- )
-
- form.AddDurationField(
- "scan_interval",
- "Scan Interval",
- "How often to scan for volumes needing vacuum",
- secondsToDuration(config.ScanIntervalSeconds),
- true,
- )
-
- form.AddDurationField(
- "min_volume_age",
- "Minimum Volume Age",
- "Only vacuum volumes older than this duration",
- secondsToDuration(config.MinVolumeAgeSeconds),
- true,
- )
-
- // Scheduling Settings
- form.AddNumberField(
- "max_concurrent",
- "Max Concurrent Tasks",
- "Maximum number of vacuum tasks that can run simultaneously",
- float64(config.MaxConcurrent),
- true,
- )
-
- form.AddDurationField(
- "min_interval",
- "Minimum Interval",
- "Minimum time between vacuum operations on the same volume",
- secondsToDuration(config.MinIntervalSeconds),
- true,
- )
-
- // Generate organized form sections using Bootstrap components
- html := `
-<div class="row">
- <div class="col-12">
- <div class="card mb-4">
- <div class="card-header">
- <h5 class="mb-0">
- <i class="fas fa-search me-2"></i>
- Detection Settings
- </h5>
- </div>
- <div class="card-body">
-` + string(form.Build()) + `
- </div>
- </div>
- </div>
-</div>
-
-<script>
-function resetForm() {
- if (confirm('Reset all vacuum settings to defaults?')) {
- // Reset to default values
- document.querySelector('input[name="enabled"]').checked = true;
- document.querySelector('input[name="garbage_threshold"]').value = '0.3';
- document.querySelector('input[name="scan_interval"]').value = '30m';
- document.querySelector('input[name="min_volume_age"]').value = '1h';
- document.querySelector('input[name="max_concurrent"]').value = '2';
- document.querySelector('input[name="min_interval"]').value = '6h';
- }
-}
-</script>
-`
-
- return template.HTML(html), nil
-}
-
-// ParseConfigForm parses form data into configuration
-func (ui *UIProvider) ParseConfigForm(formData map[string][]string) (interface{}, error) {
- config := &VacuumConfig{}
-
- // Parse enabled checkbox
- config.Enabled = len(formData["enabled"]) > 0 && formData["enabled"][0] == "on"
-
- // Parse garbage threshold
- if thresholdStr := formData["garbage_threshold"]; len(thresholdStr) > 0 {
- if threshold, err := strconv.ParseFloat(thresholdStr[0], 64); err != nil {
- return nil, fmt.Errorf("invalid garbage threshold: %w", err)
- } else if threshold < 0 || threshold > 1 {
- return nil, fmt.Errorf("garbage threshold must be between 0.0 and 1.0")
- } else {
- config.GarbageThreshold = threshold
- }
- }
-
- // Parse scan interval
- if intervalStr := formData["scan_interval"]; len(intervalStr) > 0 {
- if interval, err := time.ParseDuration(intervalStr[0]); err != nil {
- return nil, fmt.Errorf("invalid scan interval: %w", err)
- } else {
- config.ScanIntervalSeconds = durationToSeconds(interval)
- }
- }
-
- // Parse min volume age
- if ageStr := formData["min_volume_age"]; len(ageStr) > 0 {
- if age, err := time.ParseDuration(ageStr[0]); err != nil {
- return nil, fmt.Errorf("invalid min volume age: %w", err)
- } else {
- config.MinVolumeAgeSeconds = durationToSeconds(age)
- }
- }
-
- // Parse max concurrent
- if concurrentStr := formData["max_concurrent"]; len(concurrentStr) > 0 {
- if concurrent, err := strconv.Atoi(concurrentStr[0]); err != nil {
- return nil, fmt.Errorf("invalid max concurrent: %w", err)
- } else if concurrent < 1 {
- return nil, fmt.Errorf("max concurrent must be at least 1")
- } else {
- config.MaxConcurrent = concurrent
- }
- }
-
- // Parse min interval
- if intervalStr := formData["min_interval"]; len(intervalStr) > 0 {
- if interval, err := time.ParseDuration(intervalStr[0]); err != nil {
- return nil, fmt.Errorf("invalid min interval: %w", err)
- } else {
- config.MinIntervalSeconds = durationToSeconds(interval)
- }
- }
-
- return config, nil
-}
-
-// GetCurrentConfig returns the current configuration
-func (ui *UIProvider) GetCurrentConfig() interface{} {
- return ui.getCurrentVacuumConfig()
-}
-
-// ApplyConfig applies the new configuration
-func (ui *UIProvider) ApplyConfig(config interface{}) error {
- vacuumConfig, ok := config.(*VacuumConfig)
- if !ok {
- return fmt.Errorf("invalid config type, expected *VacuumConfig")
- }
-
- // Apply to detector
- if ui.detector != nil {
- ui.detector.SetEnabled(vacuumConfig.Enabled)
- ui.detector.SetGarbageThreshold(vacuumConfig.GarbageThreshold)
- ui.detector.SetScanInterval(secondsToDuration(vacuumConfig.ScanIntervalSeconds))
- ui.detector.SetMinVolumeAge(secondsToDuration(vacuumConfig.MinVolumeAgeSeconds))
- }
-
- // Apply to scheduler
- if ui.scheduler != nil {
- ui.scheduler.SetEnabled(vacuumConfig.Enabled)
- ui.scheduler.SetMaxConcurrent(vacuumConfig.MaxConcurrent)
- ui.scheduler.SetMinInterval(secondsToDuration(vacuumConfig.MinIntervalSeconds))
- }
-
- glog.V(1).Infof("Applied vacuum configuration: enabled=%v, threshold=%.1f%%, scan_interval=%s, max_concurrent=%d",
- vacuumConfig.Enabled, vacuumConfig.GarbageThreshold*100, formatDurationForUser(vacuumConfig.ScanIntervalSeconds), vacuumConfig.MaxConcurrent)
-
- return nil
-}
-
-// getCurrentVacuumConfig gets the current configuration from detector and scheduler
-func (ui *UIProvider) getCurrentVacuumConfig() *VacuumConfig {
- config := &VacuumConfig{
- // Default values (fallback if detectors/schedulers are nil)
- Enabled: true,
- GarbageThreshold: 0.3,
- ScanIntervalSeconds: 30 * 60,
- MinVolumeAgeSeconds: 1 * 60 * 60,
- MaxConcurrent: 2,
- MinIntervalSeconds: 6 * 60 * 60,
- }
-
- // Get current values from detector
- if ui.detector != nil {
- config.Enabled = ui.detector.IsEnabled()
- config.GarbageThreshold = ui.detector.GetGarbageThreshold()
- config.ScanIntervalSeconds = durationToSeconds(ui.detector.ScanInterval())
- config.MinVolumeAgeSeconds = durationToSeconds(ui.detector.GetMinVolumeAge())
- }
-
- // Get current values from scheduler
- if ui.scheduler != nil {
- config.MaxConcurrent = ui.scheduler.GetMaxConcurrent()
- config.MinIntervalSeconds = durationToSeconds(ui.scheduler.GetMinInterval())
- }
-
- return config
-}
-
-// RegisterUI registers the vacuum UI provider with the UI registry
-func RegisterUI(uiRegistry *types.UIRegistry, detector *VacuumDetector, scheduler *VacuumScheduler) {
- uiProvider := NewUIProvider(detector, scheduler)
- uiRegistry.RegisterUI(uiProvider)
-
- glog.V(1).Infof("✅ Registered vacuum task UI provider")
-}
-
-// Example: How to get the UI provider for external use
-func GetUIProvider(uiRegistry *types.UIRegistry) *UIProvider {
- provider := uiRegistry.GetProvider(types.TaskTypeVacuum)
- if provider == nil {
- return nil
- }
-
- if vacuumProvider, ok := provider.(*UIProvider); ok {
- return vacuumProvider
- }
-
- return nil
-}
diff --git a/weed/worker/tasks/vacuum/vacuum.go b/weed/worker/tasks/vacuum/vacuum.go
index dbfe35cf8..9cd254958 100644
--- a/weed/worker/tasks/vacuum/vacuum.go
+++ b/weed/worker/tasks/vacuum/vacuum.go
@@ -1,60 +1,184 @@
package vacuum
import (
+ "context"
"fmt"
+ "io"
"time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
)
// Task implements vacuum operation to reclaim disk space
type Task struct {
*tasks.BaseTask
- server string
- volumeID uint32
+ server string
+ volumeID uint32
+ garbageThreshold float64
}
// NewTask creates a new vacuum task instance
func NewTask(server string, volumeID uint32) *Task {
task := &Task{
- BaseTask: tasks.NewBaseTask(types.TaskTypeVacuum),
- server: server,
- volumeID: volumeID,
+ BaseTask: tasks.NewBaseTask(types.TaskTypeVacuum),
+ server: server,
+ volumeID: volumeID,
+ garbageThreshold: 0.3, // Default 30% threshold
}
return task
}
-// Execute executes the vacuum task
+// Execute performs the vacuum operation
func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting vacuum task for volume %d on server %s", t.volumeID, t.server)
-
- // Simulate vacuum operation with progress updates
- steps := []struct {
- name string
- duration time.Duration
- progress float64
- }{
- {"Scanning volume", 1 * time.Second, 20},
- {"Identifying deleted files", 2 * time.Second, 50},
- {"Compacting data", 3 * time.Second, 80},
- {"Finalizing vacuum", 1 * time.Second, 100},
+ // Use BaseTask.ExecuteTask to handle logging initialization
+ return t.ExecuteTask(context.Background(), params, t.executeImpl)
+}
+
+// executeImpl is the actual vacuum implementation
+func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error {
+ t.LogInfo("Starting vacuum for volume %d on server %s", t.volumeID, t.server)
+
+ // Parse garbage threshold from typed parameters
+ if params.TypedParams != nil {
+ if vacuumParams := params.TypedParams.GetVacuumParams(); vacuumParams != nil {
+ t.garbageThreshold = vacuumParams.GarbageThreshold
+ t.LogWithFields("INFO", "Using garbage threshold from parameters", map[string]interface{}{
+ "threshold": t.garbageThreshold,
+ })
+ }
+ }
+
+ // Convert server address to gRPC address and use proper dial option
+ grpcAddress := pb.ServerToGrpcAddress(t.server)
+ var dialOpt grpc.DialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
+ if params.GrpcDialOption != nil {
+ dialOpt = params.GrpcDialOption
+ }
+
+ conn, err := grpc.NewClient(grpcAddress, dialOpt)
+ if err != nil {
+ t.LogError("Failed to connect to volume server %s: %v", t.server, err)
+ return fmt.Errorf("failed to connect to volume server %s: %v", t.server, err)
+ }
+ defer conn.Close()
+
+ client := volume_server_pb.NewVolumeServerClient(conn)
+
+ // Step 1: Check vacuum eligibility
+ t.SetProgress(10.0)
+ t.LogDebug("Checking vacuum eligibility for volume %d", t.volumeID)
+
+ checkResp, err := client.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ t.LogError("Vacuum check failed for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum check failed for volume %d: %v", t.volumeID, err)
+ }
+
+ // Check if garbage ratio meets threshold
+ if checkResp.GarbageRatio < t.garbageThreshold {
+ t.LogWarning("Volume %d garbage ratio %.2f%% is below threshold %.2f%%, skipping vacuum",
+ t.volumeID, checkResp.GarbageRatio*100, t.garbageThreshold*100)
+ return fmt.Errorf("volume %d garbage ratio %.2f%% is below threshold %.2f%%, skipping vacuum",
+ t.volumeID, checkResp.GarbageRatio*100, t.garbageThreshold*100)
+ }
+
+ t.LogWithFields("INFO", "Volume eligible for vacuum", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "garbage_ratio": checkResp.GarbageRatio,
+ "threshold": t.garbageThreshold,
+ "garbage_percent": checkResp.GarbageRatio * 100,
+ })
+
+ // Step 2: Compact volume
+ t.SetProgress(30.0)
+ t.LogInfo("Starting compact for volume %d", t.volumeID)
+
+ compactStream, err := client.VacuumVolumeCompact(ctx, &volume_server_pb.VacuumVolumeCompactRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ t.LogError("Vacuum compact failed for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum compact failed for volume %d: %v", t.volumeID, err)
}
- for _, step := range steps {
- if t.IsCancelled() {
- return fmt.Errorf("vacuum task cancelled")
+ // Process compact stream and track progress
+ var processedBytes int64
+ var totalBytes int64
+
+ for {
+ resp, err := compactStream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ t.LogError("Vacuum compact stream error for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum compact stream error for volume %d: %v", t.volumeID, err)
}
- glog.V(1).Infof("Vacuum task step: %s", step.name)
- t.SetProgress(step.progress)
+ processedBytes = resp.ProcessedBytes
+ if resp.LoadAvg_1M > 0 {
+ totalBytes = int64(resp.LoadAvg_1M) // This is a rough approximation
+ }
+
+ // Update progress based on processed bytes (30% to 70% of total progress)
+ if totalBytes > 0 {
+ compactProgress := float64(processedBytes) / float64(totalBytes)
+ if compactProgress > 1.0 {
+ compactProgress = 1.0
+ }
+ progress := 30.0 + (compactProgress * 40.0) // 30% to 70%
+ t.SetProgress(progress)
+ }
- // Simulate work
- time.Sleep(step.duration)
+ t.LogWithFields("DEBUG", "Volume compact progress", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "processed_bytes": processedBytes,
+ "total_bytes": totalBytes,
+ "compact_progress": fmt.Sprintf("%.1f%%", (float64(processedBytes)/float64(totalBytes))*100),
+ })
}
- glog.Infof("Vacuum task completed for volume %d on server %s", t.volumeID, t.server)
+ // Step 3: Commit vacuum changes
+ t.SetProgress(80.0)
+ t.LogInfo("Committing vacuum for volume %d", t.volumeID)
+
+ commitResp, err := client.VacuumVolumeCommit(ctx, &volume_server_pb.VacuumVolumeCommitRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ t.LogError("Vacuum commit failed for volume %d: %v", t.volumeID, err)
+ return fmt.Errorf("vacuum commit failed for volume %d: %v", t.volumeID, err)
+ }
+
+ // Step 4: Cleanup temporary files
+ t.SetProgress(90.0)
+ t.LogInfo("Cleaning up vacuum files for volume %d", t.volumeID)
+
+ _, err = client.VacuumVolumeCleanup(ctx, &volume_server_pb.VacuumVolumeCleanupRequest{
+ VolumeId: t.volumeID,
+ })
+ if err != nil {
+ // Log warning but don't fail the task
+ t.LogWarning("Vacuum cleanup warning for volume %d: %v", t.volumeID, err)
+ }
+
+ t.SetProgress(100.0)
+
+ newVolumeSize := commitResp.VolumeSize
+ t.LogWithFields("INFO", "Successfully completed vacuum", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "new_volume_size": newVolumeSize,
+ "garbage_reclaimed": true,
+ })
+
return nil
}
@@ -71,9 +195,20 @@ func (t *Task) Validate(params types.TaskParams) error {
// EstimateTime estimates the time needed for the task
func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
- // Base time for vacuum operation
- baseTime := 25 * time.Second
+ // Base time for vacuum operations - varies by volume size and garbage ratio
+ // Typically vacuum is faster than EC encoding
+ baseTime := 5 * time.Minute
- // Could adjust based on volume size or usage patterns
+ // Use default estimation since volume size is not available in typed params
return baseTime
}
+
+// GetProgress returns the current progress
+func (t *Task) GetProgress() float64 {
+ return t.BaseTask.GetProgress()
+}
+
+// Cancel cancels the task
+func (t *Task) Cancel() error {
+ return t.BaseTask.Cancel()
+}
diff --git a/weed/worker/tasks/vacuum/vacuum_detector.go b/weed/worker/tasks/vacuum/vacuum_detector.go
deleted file mode 100644
index 6d7230c6c..000000000
--- a/weed/worker/tasks/vacuum/vacuum_detector.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package vacuum
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// VacuumDetector implements vacuum task detection using code instead of schemas
-type VacuumDetector struct {
- enabled bool
- garbageThreshold float64
- minVolumeAge time.Duration
- scanInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskDetector = (*VacuumDetector)(nil)
- _ types.PolicyConfigurableDetector = (*VacuumDetector)(nil)
-)
-
-// NewVacuumDetector creates a new simple vacuum detector
-func NewVacuumDetector() *VacuumDetector {
- return &VacuumDetector{
- enabled: true,
- garbageThreshold: 0.3,
- minVolumeAge: 24 * time.Hour,
- scanInterval: 30 * time.Minute,
- }
-}
-
-// GetTaskType returns the task type
-func (d *VacuumDetector) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// ScanForTasks scans for volumes that need vacuum operations
-func (d *VacuumDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if !d.enabled {
- return nil, nil
- }
-
- var results []*types.TaskDetectionResult
-
- for _, metric := range volumeMetrics {
- // Check if volume needs vacuum
- if metric.GarbageRatio >= d.garbageThreshold && metric.Age >= d.minVolumeAge {
- // Higher priority for volumes with more garbage
- priority := types.TaskPriorityNormal
- if metric.GarbageRatio > 0.6 {
- priority = types.TaskPriorityHigh
- }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeVacuum,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: priority,
- Reason: "Volume has excessive garbage requiring vacuum",
- Parameters: map[string]interface{}{
- "garbage_ratio": metric.GarbageRatio,
- "volume_age": metric.Age.String(),
- },
- ScheduleAt: time.Now(),
- }
- results = append(results, result)
- }
- }
-
- glog.V(2).Infof("Vacuum detector found %d volumes needing vacuum", len(results))
- return results, nil
-}
-
-// ScanInterval returns how often this detector should scan
-func (d *VacuumDetector) ScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// IsEnabled returns whether this detector is enabled
-func (d *VacuumDetector) IsEnabled() bool {
- return d.enabled
-}
-
-// Configuration setters
-
-func (d *VacuumDetector) SetEnabled(enabled bool) {
- d.enabled = enabled
-}
-
-func (d *VacuumDetector) SetGarbageThreshold(threshold float64) {
- d.garbageThreshold = threshold
-}
-
-func (d *VacuumDetector) SetScanInterval(interval time.Duration) {
- d.scanInterval = interval
-}
-
-func (d *VacuumDetector) SetMinVolumeAge(age time.Duration) {
- d.minVolumeAge = age
-}
-
-// GetGarbageThreshold returns the current garbage threshold
-func (d *VacuumDetector) GetGarbageThreshold() float64 {
- return d.garbageThreshold
-}
-
-// GetMinVolumeAge returns the minimum volume age
-func (d *VacuumDetector) GetMinVolumeAge() time.Duration {
- return d.minVolumeAge
-}
-
-// GetScanInterval returns the scan interval
-func (d *VacuumDetector) GetScanInterval() time.Duration {
- return d.scanInterval
-}
-
-// ConfigureFromPolicy configures the detector based on the maintenance policy
-func (d *VacuumDetector) ConfigureFromPolicy(policy interface{}) {
- // Type assert to the maintenance policy type we expect
- if maintenancePolicy, ok := policy.(interface {
- GetVacuumEnabled() bool
- GetVacuumGarbageRatio() float64
- }); ok {
- d.SetEnabled(maintenancePolicy.GetVacuumEnabled())
- d.SetGarbageThreshold(maintenancePolicy.GetVacuumGarbageRatio())
- } else {
- glog.V(1).Infof("Could not configure vacuum detector from policy: unsupported policy type")
- }
-}
diff --git a/weed/worker/tasks/vacuum/vacuum_register.go b/weed/worker/tasks/vacuum/vacuum_register.go
index 7d930a88e..d660c9d42 100644
--- a/weed/worker/tasks/vacuum/vacuum_register.go
+++ b/weed/worker/tasks/vacuum/vacuum_register.go
@@ -2,80 +2,71 @@ package vacuum
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates vacuum task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new vacuum task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeVacuum,
- []string{"vacuum", "storage"},
- "Vacuum operation to reclaim disk space by removing deleted files",
- ),
- }
-}
-
-// Create creates a new vacuum task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterVacuumTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeVacuum, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *VacuumDetector
- sharedScheduler *VacuumScheduler
-)
+// RegisterVacuumTask registers the vacuum task with the new architecture
+func RegisterVacuumTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*VacuumDetector, *VacuumScheduler) {
- if sharedDetector == nil {
- sharedDetector = NewVacuumDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewVacuumScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeVacuum,
+ Name: "vacuum",
+ DisplayName: "Volume Vacuum",
+ Description: "Reclaims disk space by removing deleted files from volumes",
+ Icon: "fas fa-broom text-primary",
+ Capabilities: []string{"vacuum", "storage"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 2 * time.Hour,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 2,
+ RepeatInterval: 7 * 24 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*VacuumDetector, *VacuumScheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeVacuum, factory)
+// UpdateConfigFromPersistence updates the vacuum configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("vacuum task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated vacuum task configuration from persistence")
+ return nil
}
diff --git a/weed/worker/tasks/vacuum/vacuum_scheduler.go b/weed/worker/tasks/vacuum/vacuum_scheduler.go
deleted file mode 100644
index 2b67a9f40..000000000
--- a/weed/worker/tasks/vacuum/vacuum_scheduler.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package vacuum
-
-import (
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
-)
-
-// VacuumScheduler implements vacuum task scheduling using code instead of schemas
-type VacuumScheduler struct {
- enabled bool
- maxConcurrent int
- minInterval time.Duration
-}
-
-// Compile-time interface assertions
-var (
- _ types.TaskScheduler = (*VacuumScheduler)(nil)
-)
-
-// NewVacuumScheduler creates a new simple vacuum scheduler
-func NewVacuumScheduler() *VacuumScheduler {
- return &VacuumScheduler{
- enabled: true,
- maxConcurrent: 2,
- minInterval: 6 * time.Hour,
- }
-}
-
-// GetTaskType returns the task type
-func (s *VacuumScheduler) GetTaskType() types.TaskType {
- return types.TaskTypeVacuum
-}
-
-// CanScheduleNow determines if a vacuum task can be scheduled right now
-func (s *VacuumScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
- // Check if scheduler is enabled
- if !s.enabled {
- return false
- }
-
- // Check concurrent limit
- runningVacuumCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeVacuum {
- runningVacuumCount++
- }
- }
-
- if runningVacuumCount >= s.maxConcurrent {
- return false
- }
-
- // Check if there's an available worker with vacuum capability
- for _, worker := range availableWorkers {
- if worker.CurrentLoad < worker.MaxConcurrent {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeVacuum {
- return true
- }
- }
- }
- }
-
- return false
-}
-
-// GetPriority returns the priority for this task
-func (s *VacuumScheduler) GetPriority(task *types.Task) types.TaskPriority {
- // Could adjust priority based on task parameters
- if params, ok := task.Parameters["garbage_ratio"].(float64); ok {
- if params > 0.8 {
- return types.TaskPriorityHigh
- }
- }
- return task.Priority
-}
-
-// GetMaxConcurrent returns max concurrent tasks of this type
-func (s *VacuumScheduler) GetMaxConcurrent() int {
- return s.maxConcurrent
-}
-
-// GetDefaultRepeatInterval returns the default interval to wait before repeating vacuum tasks
-func (s *VacuumScheduler) GetDefaultRepeatInterval() time.Duration {
- return s.minInterval
-}
-
-// IsEnabled returns whether this scheduler is enabled
-func (s *VacuumScheduler) IsEnabled() bool {
- return s.enabled
-}
-
-// Configuration setters
-
-func (s *VacuumScheduler) SetEnabled(enabled bool) {
- s.enabled = enabled
-}
-
-func (s *VacuumScheduler) SetMaxConcurrent(max int) {
- s.maxConcurrent = max
-}
-
-func (s *VacuumScheduler) SetMinInterval(interval time.Duration) {
- s.minInterval = interval
-}
-
-// GetMinInterval returns the minimum interval
-func (s *VacuumScheduler) GetMinInterval() time.Duration {
- return s.minInterval
-}
diff --git a/weed/worker/types/config_types.go b/weed/worker/types/config_types.go
index 8e4113580..5a9e94fd5 100644
--- a/weed/worker/types/config_types.go
+++ b/weed/worker/types/config_types.go
@@ -3,6 +3,8 @@ package types
import (
"sync"
"time"
+
+ "google.golang.org/grpc"
)
// WorkerConfig represents the configuration for a worker
@@ -12,7 +14,9 @@ type WorkerConfig struct {
MaxConcurrent int `json:"max_concurrent"`
HeartbeatInterval time.Duration `json:"heartbeat_interval"`
TaskRequestInterval time.Duration `json:"task_request_interval"`
+ BaseWorkingDir string `json:"base_working_dir,omitempty"`
CustomParameters map[string]interface{} `json:"custom_parameters,omitempty"`
+ GrpcDialOption grpc.DialOption `json:"-"` // Not serializable, for runtime use only
}
// MaintenanceConfig represents the configuration for the maintenance system
diff --git a/weed/worker/types/data_types.go b/weed/worker/types/data_types.go
index 4a018563a..c6ba62a18 100644
--- a/weed/worker/types/data_types.go
+++ b/weed/worker/types/data_types.go
@@ -16,6 +16,8 @@ type ClusterInfo struct {
type VolumeHealthMetrics struct {
VolumeID uint32
Server string
+ DiskType string // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1")
+ DiskId uint32 // ID of the disk in Store.Locations array
Collection string
Size uint64
DeletedBytes uint64
diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go
index b0fdb009f..dc454c211 100644
--- a/weed/worker/types/task_types.go
+++ b/weed/worker/types/task_types.go
@@ -2,6 +2,9 @@ package types
import (
"time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "google.golang.org/grpc"
)
// TaskType represents the type of maintenance task
@@ -11,6 +14,7 @@ const (
TaskTypeVacuum TaskType = "vacuum"
TaskTypeErasureCoding TaskType = "erasure_coding"
TaskTypeBalance TaskType = "balance"
+ TaskTypeReplication TaskType = "replication"
)
// TaskStatus represents the status of a maintenance task
@@ -26,53 +30,57 @@ const (
)
// TaskPriority represents the priority of a maintenance task
-type TaskPriority int
+type TaskPriority string
const (
- TaskPriorityLow TaskPriority = 1
- TaskPriorityNormal TaskPriority = 5
- TaskPriorityHigh TaskPriority = 10
+ TaskPriorityLow TaskPriority = "low"
+ TaskPriorityMedium TaskPriority = "medium"
+ TaskPriorityNormal TaskPriority = "normal"
+ TaskPriorityHigh TaskPriority = "high"
+ TaskPriorityCritical TaskPriority = "critical"
)
// Task represents a maintenance task
type Task struct {
- ID string `json:"id"`
- Type TaskType `json:"type"`
- Status TaskStatus `json:"status"`
- Priority TaskPriority `json:"priority"`
- VolumeID uint32 `json:"volume_id,omitempty"`
- Server string `json:"server,omitempty"`
- Collection string `json:"collection,omitempty"`
- WorkerID string `json:"worker_id,omitempty"`
- Progress float64 `json:"progress"`
- Error string `json:"error,omitempty"`
- Parameters map[string]interface{} `json:"parameters,omitempty"`
- CreatedAt time.Time `json:"created_at"`
- ScheduledAt time.Time `json:"scheduled_at"`
- StartedAt *time.Time `json:"started_at,omitempty"`
- CompletedAt *time.Time `json:"completed_at,omitempty"`
- RetryCount int `json:"retry_count"`
- MaxRetries int `json:"max_retries"`
+ ID string `json:"id"`
+ Type TaskType `json:"type"`
+ Status TaskStatus `json:"status"`
+ Priority TaskPriority `json:"priority"`
+ VolumeID uint32 `json:"volume_id,omitempty"`
+ Server string `json:"server,omitempty"`
+ Collection string `json:"collection,omitempty"`
+ WorkerID string `json:"worker_id,omitempty"`
+ Progress float64 `json:"progress"`
+ Error string `json:"error,omitempty"`
+ TypedParams *worker_pb.TaskParams `json:"typed_params,omitempty"`
+ CreatedAt time.Time `json:"created_at"`
+ ScheduledAt time.Time `json:"scheduled_at"`
+ StartedAt *time.Time `json:"started_at,omitempty"`
+ CompletedAt *time.Time `json:"completed_at,omitempty"`
+ RetryCount int `json:"retry_count"`
+ MaxRetries int `json:"max_retries"`
}
// TaskParams represents parameters for task execution
type TaskParams struct {
- VolumeID uint32 `json:"volume_id,omitempty"`
- Server string `json:"server,omitempty"`
- Collection string `json:"collection,omitempty"`
- Parameters map[string]interface{} `json:"parameters,omitempty"`
+ VolumeID uint32 `json:"volume_id,omitempty"`
+ Server string `json:"server,omitempty"`
+ Collection string `json:"collection,omitempty"`
+ WorkingDir string `json:"working_dir,omitempty"`
+ TypedParams *worker_pb.TaskParams `json:"typed_params,omitempty"`
+ GrpcDialOption grpc.DialOption `json:"-"` // Not serializable, for runtime use only
}
// TaskDetectionResult represents the result of scanning for maintenance needs
type TaskDetectionResult struct {
- TaskType TaskType `json:"task_type"`
- VolumeID uint32 `json:"volume_id,omitempty"`
- Server string `json:"server,omitempty"`
- Collection string `json:"collection,omitempty"`
- Priority TaskPriority `json:"priority"`
- Reason string `json:"reason"`
- Parameters map[string]interface{} `json:"parameters,omitempty"`
- ScheduleAt time.Time `json:"schedule_at"`
+ TaskType TaskType `json:"task_type"`
+ VolumeID uint32 `json:"volume_id,omitempty"`
+ Server string `json:"server,omitempty"`
+ Collection string `json:"collection,omitempty"`
+ Priority TaskPriority `json:"priority"`
+ Reason string `json:"reason"`
+ TypedParams *worker_pb.TaskParams `json:"typed_params,omitempty"`
+ ScheduleAt time.Time `json:"schedule_at"`
}
// ClusterReplicationTask represents a cluster replication task parameters
diff --git a/weed/worker/types/task_ui.go b/weed/worker/types/task_ui.go
index e1e2752ba..9294127a8 100644
--- a/weed/worker/types/task_ui.go
+++ b/weed/worker/types/task_ui.go
@@ -1,12 +1,60 @@
package types
import (
- "fmt"
- "html/template"
"time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
+// Helper function to convert seconds to the most appropriate interval unit
+func secondsToIntervalValueUnit(totalSeconds int) (int, string) {
+ if totalSeconds == 0 {
+ return 0, "minute"
+ }
+
+ // Check if it's evenly divisible by days
+ if totalSeconds%(24*3600) == 0 {
+ return totalSeconds / (24 * 3600), "day"
+ }
+
+ // Check if it's evenly divisible by hours
+ if totalSeconds%3600 == 0 {
+ return totalSeconds / 3600, "hour"
+ }
+
+ // Default to minutes
+ return totalSeconds / 60, "minute"
+}
+
+// Helper function to convert interval value and unit to seconds
+func IntervalValueUnitToSeconds(value int, unit string) int {
+ switch unit {
+ case "day":
+ return value * 24 * 3600
+ case "hour":
+ return value * 3600
+ case "minute":
+ return value * 60
+ default:
+ return value * 60 // Default to minutes
+ }
+}
+
+// TaskConfig defines the interface for task configurations
+// This matches the interfaces used in base package and handlers
+type TaskConfig interface {
+ // Common methods from BaseConfig
+ IsEnabled() bool
+ SetEnabled(enabled bool)
+ Validate() error
+
+ // Protobuf serialization methods - no more interface{}!
+ ToTaskPolicy() *worker_pb.TaskPolicy
+ FromTaskPolicy(policy *worker_pb.TaskPolicy) error
+}
+
// TaskUIProvider defines how tasks provide their configuration UI
+// This interface is simplified to work with schema-driven configuration
type TaskUIProvider interface {
// GetTaskType returns the task type
GetTaskType() TaskType
@@ -20,17 +68,14 @@ type TaskUIProvider interface {
// GetIcon returns the icon CSS class or HTML for this task type
GetIcon() string
- // RenderConfigForm renders the configuration form HTML
- RenderConfigForm(currentConfig interface{}) (template.HTML, error)
+ // GetCurrentConfig returns the current configuration as TaskConfig
+ GetCurrentConfig() TaskConfig
- // ParseConfigForm parses form data into configuration
- ParseConfigForm(formData map[string][]string) (interface{}, error)
+ // ApplyTaskPolicy applies protobuf TaskPolicy configuration
+ ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error
- // GetCurrentConfig returns the current configuration
- GetCurrentConfig() interface{}
-
- // ApplyConfig applies the new configuration
- ApplyConfig(config interface{}) error
+ // ApplyTaskConfig applies TaskConfig interface configuration
+ ApplyTaskConfig(config TaskConfig) error
}
// TaskStats represents runtime statistics for a task type
@@ -87,195 +132,10 @@ type TaskListData struct {
}
type TaskDetailsData struct {
- Task *Task `json:"task"`
- TaskType TaskType `json:"task_type"`
- DisplayName string `json:"display_name"`
- Description string `json:"description"`
- Stats *TaskStats `json:"stats"`
- ConfigForm template.HTML `json:"config_form"`
- LastUpdated time.Time `json:"last_updated"`
-}
-
-// Common form field types for simple form building
-type FormField struct {
- Name string `json:"name"`
- Label string `json:"label"`
- Type string `json:"type"` // text, number, checkbox, select, duration
- Value interface{} `json:"value"`
- Description string `json:"description"`
- Required bool `json:"required"`
- Options []FormOption `json:"options,omitempty"` // For select fields
-}
-
-type FormOption struct {
- Value string `json:"value"`
- Label string `json:"label"`
-}
-
-// Helper for building forms in code
-type FormBuilder struct {
- fields []FormField
-}
-
-// NewFormBuilder creates a new form builder
-func NewFormBuilder() *FormBuilder {
- return &FormBuilder{
- fields: make([]FormField, 0),
- }
-}
-
-// AddTextField adds a text input field
-func (fb *FormBuilder) AddTextField(name, label, description string, value string, required bool) *FormBuilder {
- fb.fields = append(fb.fields, FormField{
- Name: name,
- Label: label,
- Type: "text",
- Value: value,
- Description: description,
- Required: required,
- })
- return fb
-}
-
-// AddNumberField adds a number input field
-func (fb *FormBuilder) AddNumberField(name, label, description string, value float64, required bool) *FormBuilder {
- fb.fields = append(fb.fields, FormField{
- Name: name,
- Label: label,
- Type: "number",
- Value: value,
- Description: description,
- Required: required,
- })
- return fb
-}
-
-// AddCheckboxField adds a checkbox field
-func (fb *FormBuilder) AddCheckboxField(name, label, description string, value bool) *FormBuilder {
- fb.fields = append(fb.fields, FormField{
- Name: name,
- Label: label,
- Type: "checkbox",
- Value: value,
- Description: description,
- Required: false,
- })
- return fb
-}
-
-// AddSelectField adds a select dropdown field
-func (fb *FormBuilder) AddSelectField(name, label, description string, value string, options []FormOption, required bool) *FormBuilder {
- fb.fields = append(fb.fields, FormField{
- Name: name,
- Label: label,
- Type: "select",
- Value: value,
- Description: description,
- Required: required,
- Options: options,
- })
- return fb
-}
-
-// AddDurationField adds a duration input field
-func (fb *FormBuilder) AddDurationField(name, label, description string, value time.Duration, required bool) *FormBuilder {
- fb.fields = append(fb.fields, FormField{
- Name: name,
- Label: label,
- Type: "duration",
- Value: value.String(),
- Description: description,
- Required: required,
- })
- return fb
-}
-
-// Build generates the HTML form fields with Bootstrap styling
-func (fb *FormBuilder) Build() template.HTML {
- html := ""
-
- for _, field := range fb.fields {
- html += fb.renderField(field)
- }
-
- return template.HTML(html)
-}
-
-// renderField renders a single form field with Bootstrap classes
-func (fb *FormBuilder) renderField(field FormField) string {
- html := "<div class=\"mb-3\">\n"
-
- // Special handling for checkbox fields
- if field.Type == "checkbox" {
- checked := ""
- if field.Value.(bool) {
- checked = " checked"
- }
- html += " <div class=\"form-check\">\n"
- html += " <input type=\"checkbox\" class=\"form-check-input\" id=\"" + field.Name + "\" name=\"" + field.Name + "\"" + checked + ">\n"
- html += " <label class=\"form-check-label\" for=\"" + field.Name + "\">" + field.Label + "</label>\n"
- html += " </div>\n"
- // Description for checkbox
- if field.Description != "" {
- html += " <div class=\"form-text text-muted\">" + field.Description + "</div>\n"
- }
- html += "</div>\n"
- return html
- }
-
- // Label for non-checkbox fields
- required := ""
- if field.Required {
- required = " <span class=\"text-danger\">*</span>"
- }
- html += " <label for=\"" + field.Name + "\" class=\"form-label\">" + field.Label + required + "</label>\n"
-
- // Input based on type
- switch field.Type {
- case "text":
- html += " <input type=\"text\" class=\"form-control\" id=\"" + field.Name + "\" name=\"" + field.Name + "\" value=\"" + field.Value.(string) + "\""
- if field.Required {
- html += " required"
- }
- html += ">\n"
-
- case "number":
- html += " <input type=\"number\" class=\"form-control\" id=\"" + field.Name + "\" name=\"" + field.Name + "\" step=\"any\" value=\"" +
- fmt.Sprintf("%v", field.Value) + "\""
- if field.Required {
- html += " required"
- }
- html += ">\n"
-
- case "select":
- html += " <select class=\"form-select\" id=\"" + field.Name + "\" name=\"" + field.Name + "\""
- if field.Required {
- html += " required"
- }
- html += ">\n"
- for _, option := range field.Options {
- selected := ""
- if option.Value == field.Value.(string) {
- selected = " selected"
- }
- html += " <option value=\"" + option.Value + "\"" + selected + ">" + option.Label + "</option>\n"
- }
- html += " </select>\n"
-
- case "duration":
- html += " <input type=\"text\" class=\"form-control\" id=\"" + field.Name + "\" name=\"" + field.Name + "\" value=\"" + field.Value.(string) +
- "\" placeholder=\"e.g., 30m, 2h, 24h\""
- if field.Required {
- html += " required"
- }
- html += ">\n"
- }
-
- // Description for non-checkbox fields
- if field.Description != "" {
- html += " <div class=\"form-text text-muted\">" + field.Description + "</div>\n"
- }
-
- html += "</div>\n"
- return html
+ Task *Task `json:"task"`
+ TaskType TaskType `json:"task_type"`
+ DisplayName string `json:"display_name"`
+ Description string `json:"description"`
+ Stats *TaskStats `json:"stats"`
+ LastUpdated time.Time `json:"last_updated"`
}
diff --git a/weed/worker/types/typed_task_interface.go b/weed/worker/types/typed_task_interface.go
new file mode 100644
index 000000000..3dffe510c
--- /dev/null
+++ b/weed/worker/types/typed_task_interface.go
@@ -0,0 +1,121 @@
+package types
+
+import (
+ "errors"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+)
+
+var (
+ // ErrTaskTypeNotFound is returned when a task type is not registered
+ ErrTaskTypeNotFound = errors.New("task type not found")
+)
+
+// TaskLogger interface for task logging (minimal definition to avoid import cycles)
+type TaskLogger interface {
+ Info(message string, args ...interface{})
+ Warning(message string, args ...interface{})
+ Error(message string, args ...interface{})
+ Debug(message string, args ...interface{})
+ LogWithFields(level string, message string, fields map[string]interface{})
+ Close() error
+}
+
+// TaskLoggerConfig holds configuration for task logging (minimal definition)
+type TaskLoggerConfig struct {
+ BaseLogDir string
+ MaxTasks int
+ MaxLogSizeMB int
+ EnableConsole bool
+}
+
+// TypedTaskInterface defines the interface for tasks using typed protobuf parameters
+type TypedTaskInterface interface {
+ // Execute the task with typed protobuf parameters
+ ExecuteTyped(params *worker_pb.TaskParams) error
+
+ // Validate typed task parameters
+ ValidateTyped(params *worker_pb.TaskParams) error
+
+ // Estimate execution time based on typed parameters
+ EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration
+
+ // Get task type
+ GetType() TaskType
+
+ // Check if task can be cancelled
+ IsCancellable() bool
+
+ // Cancel the task if running
+ Cancel() error
+
+ // Get current progress (0-100)
+ GetProgress() float64
+
+ // Set progress callback for progress updates
+ SetProgressCallback(callback func(float64))
+
+ // Logger configuration and initialization (all typed tasks support this)
+ SetLoggerConfig(config TaskLoggerConfig)
+ InitializeTaskLogger(taskID string, workerID string, params TaskParams) error
+ GetTaskLogger() TaskLogger
+
+ // Logging methods (all typed tasks support this)
+ LogInfo(message string, args ...interface{})
+ LogWarning(message string, args ...interface{})
+ LogError(message string, args ...interface{})
+ LogDebug(message string, args ...interface{})
+ LogWithFields(level string, message string, fields map[string]interface{})
+}
+
+// TypedTaskCreator is a function that creates a new typed task instance
+type TypedTaskCreator func() TypedTaskInterface
+
+// TypedTaskRegistry manages typed task creation
+type TypedTaskRegistry struct {
+ creators map[TaskType]TypedTaskCreator
+}
+
+// NewTypedTaskRegistry creates a new typed task registry
+func NewTypedTaskRegistry() *TypedTaskRegistry {
+ return &TypedTaskRegistry{
+ creators: make(map[TaskType]TypedTaskCreator),
+ }
+}
+
+// RegisterTypedTask registers a typed task creator
+func (r *TypedTaskRegistry) RegisterTypedTask(taskType TaskType, creator TypedTaskCreator) {
+ r.creators[taskType] = creator
+}
+
+// CreateTypedTask creates a new typed task instance
+func (r *TypedTaskRegistry) CreateTypedTask(taskType TaskType) (TypedTaskInterface, error) {
+ creator, exists := r.creators[taskType]
+ if !exists {
+ return nil, ErrTaskTypeNotFound
+ }
+ return creator(), nil
+}
+
+// GetSupportedTypes returns all registered typed task types
+func (r *TypedTaskRegistry) GetSupportedTypes() []TaskType {
+ types := make([]TaskType, 0, len(r.creators))
+ for taskType := range r.creators {
+ types = append(types, taskType)
+ }
+ return types
+}
+
+// Global typed task registry
+var globalTypedTaskRegistry = NewTypedTaskRegistry()
+
+// RegisterGlobalTypedTask registers a typed task globally
+func RegisterGlobalTypedTask(taskType TaskType, creator TypedTaskCreator) {
+ globalTypedTaskRegistry.RegisterTypedTask(taskType, creator)
+}
+
+// GetGlobalTypedTaskRegistry returns the global typed task registry
+func GetGlobalTypedTaskRegistry() *TypedTaskRegistry {
+ return globalTypedTaskRegistry
+}
diff --git a/weed/worker/worker.go b/weed/worker/worker.go
index 3b7899f07..ff6b87808 100644
--- a/weed/worker/worker.go
+++ b/weed/worker/worker.go
@@ -1,12 +1,17 @@
package worker
import (
+ "crypto/rand"
"fmt"
+ "net"
"os"
+ "path/filepath"
+ "strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
@@ -31,6 +36,7 @@ type Worker struct {
tasksFailed int
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
+ taskLogHandler *tasks.TaskLogHandler
}
// AdminClient defines the interface for communicating with the admin server
@@ -41,30 +47,113 @@ type AdminClient interface {
SendHeartbeat(workerID string, status *types.WorkerStatus) error
RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error)
CompleteTask(taskID string, success bool, errorMsg string) error
+ CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error
UpdateTaskProgress(taskID string, progress float64) error
IsConnected() bool
}
+// GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory
+func GenerateOrLoadWorkerID(workingDir string) (string, error) {
+ const workerIDFile = "worker.id"
+
+ var idFilePath string
+ if workingDir != "" {
+ idFilePath = filepath.Join(workingDir, workerIDFile)
+ } else {
+ // Use current working directory if none specified
+ wd, err := os.Getwd()
+ if err != nil {
+ return "", fmt.Errorf("failed to get working directory: %w", err)
+ }
+ idFilePath = filepath.Join(wd, workerIDFile)
+ }
+
+ // Try to read existing worker ID
+ if data, err := os.ReadFile(idFilePath); err == nil {
+ workerID := strings.TrimSpace(string(data))
+ if workerID != "" {
+ glog.Infof("Loaded existing worker ID from %s: %s", idFilePath, workerID)
+ return workerID, nil
+ }
+ }
+
+ // Generate new unique worker ID with host information
+ hostname, _ := os.Hostname()
+ if hostname == "" {
+ hostname = "unknown"
+ }
+
+ // Get local IP address for better host identification
+ var hostIP string
+ if addrs, err := net.InterfaceAddrs(); err == nil {
+ for _, addr := range addrs {
+ if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ipnet.IP.To4() != nil {
+ hostIP = ipnet.IP.String()
+ break
+ }
+ }
+ }
+ }
+ if hostIP == "" {
+ hostIP = "noip"
+ }
+
+ // Create host identifier combining hostname and IP
+ hostID := fmt.Sprintf("%s@%s", hostname, hostIP)
+
+ // Generate random component for uniqueness
+ randomBytes := make([]byte, 4)
+ var workerID string
+ if _, err := rand.Read(randomBytes); err != nil {
+ // Fallback to timestamp if crypto/rand fails
+ workerID = fmt.Sprintf("worker-%s-%d", hostID, time.Now().Unix())
+ glog.Infof("Generated fallback worker ID: %s", workerID)
+ } else {
+ // Use random bytes + timestamp for uniqueness
+ randomHex := fmt.Sprintf("%x", randomBytes)
+ timestamp := time.Now().Unix()
+ workerID = fmt.Sprintf("worker-%s-%s-%d", hostID, randomHex, timestamp)
+ glog.Infof("Generated new worker ID: %s", workerID)
+ }
+
+ // Save worker ID to file
+ if err := os.WriteFile(idFilePath, []byte(workerID), 0644); err != nil {
+ glog.Warningf("Failed to save worker ID to %s: %v", idFilePath, err)
+ } else {
+ glog.Infof("Saved worker ID to %s", idFilePath)
+ }
+
+ return workerID, nil
+}
+
// NewWorker creates a new worker instance
func NewWorker(config *types.WorkerConfig) (*Worker, error) {
if config == nil {
config = types.DefaultWorkerConfig()
}
- // Always auto-generate worker ID
- hostname, _ := os.Hostname()
- workerID := fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
+ // Generate or load persistent worker ID
+ workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir)
+ if err != nil {
+ return nil, fmt.Errorf("failed to generate or load worker ID: %w", err)
+ }
// Use the global registry that already has all tasks registered
registry := tasks.GetGlobalRegistry()
+ // Initialize task log handler
+ logDir := filepath.Join(config.BaseWorkingDir, "task_logs")
+ taskLogHandler := tasks.NewTaskLogHandler(logDir)
+
worker := &Worker{
- id: workerID,
- config: config,
- registry: registry,
- currentTasks: make(map[string]*types.Task),
- stopChan: make(chan struct{}),
- startTime: time.Now(),
+ id: workerID,
+ config: config,
+ registry: registry,
+ currentTasks: make(map[string]*types.Task),
+ stopChan: make(chan struct{}),
+ startTime: time.Now(),
+ taskLogHandler: taskLogHandler,
}
glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetSupportedTypes()))
@@ -72,6 +161,17 @@ func NewWorker(config *types.WorkerConfig) (*Worker, error) {
return worker, nil
}
+// getTaskLoggerConfig returns the task logger configuration with worker's log directory
+func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig {
+ config := tasks.DefaultTaskLoggerConfig()
+
+ // Use worker's configured log directory (BaseWorkingDir is guaranteed to be non-empty)
+ logDir := filepath.Join(w.config.BaseWorkingDir, "task_logs")
+ config.BaseLogDir = logDir
+
+ return config
+}
+
// ID returns the worker ID
func (w *Worker) ID() string {
return w.id
@@ -90,15 +190,10 @@ func (w *Worker) Start() error {
return fmt.Errorf("admin client is not set")
}
- // Connect to admin server
- if err := w.adminClient.Connect(); err != nil {
- return fmt.Errorf("failed to connect to admin server: %w", err)
- }
-
w.running = true
w.startTime = time.Now()
- // Register with admin server
+ // Prepare worker info for registration
workerInfo := &types.Worker{
ID: w.id,
Capabilities: w.config.Capabilities,
@@ -108,17 +203,33 @@ func (w *Worker) Start() error {
LastHeartbeat: time.Now(),
}
+ // Register worker info with client first (this stores it for use during connection)
if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
- w.running = false
- w.adminClient.Disconnect()
- return fmt.Errorf("failed to register worker: %w", err)
+ glog.V(1).Infof("Worker info stored for registration: %v", err)
+ // This is expected if not connected yet
}
- // Start worker loops
+ // Start connection attempt (will register immediately if successful)
+ glog.Infof("🚀 WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d",
+ w.id, w.config.Capabilities, w.config.MaxConcurrent)
+
+ // Try initial connection, but don't fail if it doesn't work immediately
+ if err := w.adminClient.Connect(); err != nil {
+ glog.Warningf("⚠️ INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err)
+ // Don't return error - let the reconnection loop handle it
+ } else {
+ glog.Infof("✅ INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id)
+ }
+
+ // Start worker loops regardless of initial connection status
+ // They will handle connection failures gracefully
+ glog.V(1).Infof("🔄 STARTING LOOPS: Worker %s starting background loops", w.id)
go w.heartbeatLoop()
go w.taskRequestLoop()
+ go w.connectionMonitorLoop()
+ go w.messageProcessingLoop()
- glog.Infof("Worker %s started", w.id)
+ glog.Infof("✅ WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id)
return nil
}
@@ -208,14 +319,25 @@ func (w *Worker) GetStatus() types.WorkerStatus {
// HandleTask handles a task execution
func (w *Worker) HandleTask(task *types.Task) error {
+ glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)",
+ w.id, task.ID, task.Type, task.VolumeID)
+
w.mutex.Lock()
- if len(w.currentTasks) >= w.config.MaxConcurrent {
+ currentLoad := len(w.currentTasks)
+ if currentLoad >= w.config.MaxConcurrent {
w.mutex.Unlock()
+ glog.Errorf("❌ TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
+ w.id, currentLoad, w.config.MaxConcurrent, task.ID)
return fmt.Errorf("worker is at capacity")
}
+
w.currentTasks[task.ID] = task
+ newLoad := len(w.currentTasks)
w.mutex.Unlock()
+ glog.Infof("✅ TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
+ w.id, task.ID, newLoad, w.config.MaxConcurrent)
+
// Execute task in goroutine
go w.executeTask(task)
@@ -249,40 +371,95 @@ func (w *Worker) SetAdminClient(client AdminClient) {
// executeTask executes a task
func (w *Worker) executeTask(task *types.Task) {
+ startTime := time.Now()
+
defer func() {
w.mutex.Lock()
delete(w.currentTasks, task.ID)
+ currentLoad := len(w.currentTasks)
w.mutex.Unlock()
+
+ duration := time.Since(startTime)
+ glog.Infof("🏁 TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d",
+ w.id, task.ID, duration, currentLoad, w.config.MaxConcurrent)
}()
- glog.Infof("Worker %s executing task %s: %s", w.id, task.ID, task.Type)
+ glog.Infof("🚀 TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v",
+ w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339))
- // Create task instance
- taskParams := types.TaskParams{
- VolumeID: task.VolumeID,
- Server: task.Server,
- Collection: task.Collection,
- Parameters: task.Parameters,
+ // Report task start to admin server
+ if err := w.adminClient.UpdateTaskProgress(task.ID, 0.0); err != nil {
+ glog.V(1).Infof("Failed to report task start to admin: %v", err)
}
- taskInstance, err := w.registry.CreateTask(task.Type, taskParams)
+ // Determine task-specific working directory (BaseWorkingDir is guaranteed to be non-empty)
+ taskWorkingDir := filepath.Join(w.config.BaseWorkingDir, string(task.Type))
+ glog.V(2).Infof("📁 WORKING DIRECTORY: Task %s using working directory: %s", task.ID, taskWorkingDir)
+
+ // Check if we have typed protobuf parameters
+ if task.TypedParams == nil {
+ w.completeTask(task.ID, false, "task has no typed parameters - task was not properly planned")
+ glog.Errorf("Worker %s rejecting task %s: no typed parameters", w.id, task.ID)
+ return
+ }
+
+ // Use typed task execution (all tasks should be typed)
+ glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID)
+
+ typedRegistry := types.GetGlobalTypedTaskRegistry()
+ typedTaskInstance, err := typedRegistry.CreateTypedTask(task.Type)
if err != nil {
- w.completeTask(task.ID, false, fmt.Sprintf("failed to create task: %v", err))
+ w.completeTask(task.ID, false, fmt.Sprintf("typed task not available for %s: %v", task.Type, err))
+ glog.Errorf("Worker %s failed to create typed task %s: %v", w.id, task.ID, err)
return
}
- // Execute task
- err = taskInstance.Execute(taskParams)
+ // Configure task logger directory (all typed tasks support this)
+ tasksLoggerConfig := w.getTaskLoggerConfig()
+ typedLoggerConfig := types.TaskLoggerConfig{
+ BaseLogDir: tasksLoggerConfig.BaseLogDir,
+ MaxTasks: tasksLoggerConfig.MaxTasks,
+ MaxLogSizeMB: tasksLoggerConfig.MaxLogSizeMB,
+ EnableConsole: tasksLoggerConfig.EnableConsole,
+ }
+ typedTaskInstance.SetLoggerConfig(typedLoggerConfig)
+ glog.V(2).Infof("Set typed task logger config for %s: %s", task.ID, typedLoggerConfig.BaseLogDir)
+
+ // Initialize logging (all typed tasks support this)
+ taskParams := types.TaskParams{
+ VolumeID: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ WorkingDir: taskWorkingDir,
+ TypedParams: task.TypedParams,
+ GrpcDialOption: w.config.GrpcDialOption,
+ }
+
+ if err := typedTaskInstance.InitializeTaskLogger(task.ID, w.id, taskParams); err != nil {
+ glog.Warningf("Failed to initialize task logger for %s: %v", task.ID, err)
+ }
+
+ // Set progress callback that reports to admin server
+ typedTaskInstance.SetProgressCallback(func(progress float64) {
+ // Report progress updates to admin server
+ glog.V(2).Infof("Task %s progress: %.1f%%", task.ID, progress)
+ if err := w.adminClient.UpdateTaskProgress(task.ID, progress); err != nil {
+ glog.V(1).Infof("Failed to report task progress to admin: %v", err)
+ }
+ })
+
+ // Execute typed task
+ err = typedTaskInstance.ExecuteTyped(task.TypedParams)
// Report completion
if err != nil {
w.completeTask(task.ID, false, err.Error())
w.tasksFailed++
- glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
+ glog.Errorf("Worker %s failed to execute typed task %s: %v", w.id, task.ID, err)
} else {
w.completeTask(task.ID, true, "")
w.tasksCompleted++
- glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)
+ glog.Infof("Worker %s completed typed task %s successfully", w.id, task.ID)
}
}
@@ -348,20 +525,29 @@ func (w *Worker) requestTasks() {
w.mutex.RUnlock()
if currentLoad >= w.config.MaxConcurrent {
+ glog.V(3).Infof("🚫 TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)",
+ w.id, currentLoad, w.config.MaxConcurrent)
return // Already at capacity
}
if w.adminClient != nil {
+ glog.V(3).Infof("📞 REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)",
+ w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities)
+
task, err := w.adminClient.RequestTask(w.id, w.config.Capabilities)
if err != nil {
- glog.V(2).Infof("Failed to request task: %v", err)
+ glog.V(2).Infof("❌ TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err)
return
}
if task != nil {
+ glog.Infof("📨 TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s",
+ w.id, task.ID, task.Type)
if err := w.HandleTask(task); err != nil {
- glog.Errorf("Failed to handle task: %v", err)
+ glog.Errorf("❌ TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err)
}
+ } else {
+ glog.V(3).Infof("📭 NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id)
}
}
}
@@ -383,6 +569,59 @@ func (w *Worker) GetCurrentTasks() map[string]*types.Task {
return tasks
}
+// registerWorker registers the worker with the admin server
+func (w *Worker) registerWorker() {
+ workerInfo := &types.Worker{
+ ID: w.id,
+ Capabilities: w.config.Capabilities,
+ MaxConcurrent: w.config.MaxConcurrent,
+ Status: "active",
+ CurrentLoad: 0,
+ LastHeartbeat: time.Now(),
+ }
+
+ if err := w.adminClient.RegisterWorker(workerInfo); err != nil {
+ glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err)
+ } else {
+ glog.Infof("Worker %s registered successfully with admin server", w.id)
+ }
+}
+
+// connectionMonitorLoop monitors connection status
+func (w *Worker) connectionMonitorLoop() {
+ glog.V(1).Infof("🔍 CONNECTION MONITOR STARTED: Worker %s connection monitor loop started", w.id)
+ ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
+ defer ticker.Stop()
+
+ lastConnectionStatus := false
+
+ for {
+ select {
+ case <-w.stopChan:
+ glog.V(1).Infof("🛑 CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id)
+ return
+ case <-ticker.C:
+ // Monitor connection status and log changes
+ currentConnectionStatus := w.adminClient != nil && w.adminClient.IsConnected()
+
+ if currentConnectionStatus != lastConnectionStatus {
+ if currentConnectionStatus {
+ glog.Infof("🔗 CONNECTION RESTORED: Worker %s connection status changed: connected", w.id)
+ } else {
+ glog.Warningf("⚠️ CONNECTION LOST: Worker %s connection status changed: disconnected", w.id)
+ }
+ lastConnectionStatus = currentConnectionStatus
+ } else {
+ if currentConnectionStatus {
+ glog.V(3).Infof("✅ CONNECTION OK: Worker %s connection status: connected", w.id)
+ } else {
+ glog.V(1).Infof("🔌 CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id)
+ }
+ }
+ }
+ }
+}
+
// GetConfig returns the worker configuration
func (w *Worker) GetConfig() *types.WorkerConfig {
return w.config
@@ -408,3 +647,158 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
SuccessRate: successRate,
}
}
+
+// messageProcessingLoop processes incoming admin messages
+func (w *Worker) messageProcessingLoop() {
+ glog.Infof("🔄 MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
+
+ // Get access to the incoming message channel from gRPC client
+ grpcClient, ok := w.adminClient.(*GrpcAdminClient)
+ if !ok {
+ glog.Warningf("⚠️ MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id)
+ return
+ }
+
+ incomingChan := grpcClient.GetIncomingChannel()
+ glog.V(1).Infof("📡 MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id)
+
+ for {
+ select {
+ case <-w.stopChan:
+ glog.Infof("🛑 MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id)
+ return
+ case message := <-incomingChan:
+ if message != nil {
+ glog.V(3).Infof("📥 MESSAGE PROCESSING: Worker %s processing incoming message", w.id)
+ w.processAdminMessage(message)
+ } else {
+ glog.V(3).Infof("📭 NULL MESSAGE: Worker %s received nil message", w.id)
+ }
+ }
+ }
+}
+
+// processAdminMessage processes different types of admin messages
+func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
+ glog.V(4).Infof("📫 ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message)
+
+ switch msg := message.Message.(type) {
+ case *worker_pb.AdminMessage_RegistrationResponse:
+ glog.V(2).Infof("✅ REGISTRATION RESPONSE: Worker %s received registration response", w.id)
+ w.handleRegistrationResponse(msg.RegistrationResponse)
+ case *worker_pb.AdminMessage_HeartbeatResponse:
+ glog.V(3).Infof("💓 HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id)
+ w.handleHeartbeatResponse(msg.HeartbeatResponse)
+ case *worker_pb.AdminMessage_TaskLogRequest:
+ glog.V(1).Infof("📋 TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId)
+ w.handleTaskLogRequest(msg.TaskLogRequest)
+ case *worker_pb.AdminMessage_TaskAssignment:
+ taskAssign := msg.TaskAssignment
+ glog.V(1).Infof("Worker %s received direct task assignment %s (type: %s, volume: %d)",
+ w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
+
+ // Convert to task and handle it
+ task := &types.Task{
+ ID: taskAssign.TaskId,
+ Type: types.TaskType(taskAssign.TaskType),
+ Status: types.TaskStatusAssigned,
+ VolumeID: taskAssign.Params.VolumeId,
+ Server: taskAssign.Params.Server,
+ Collection: taskAssign.Params.Collection,
+ Priority: types.TaskPriority(taskAssign.Priority),
+ CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
+ TypedParams: taskAssign.Params,
+ }
+
+ if err := w.HandleTask(task); err != nil {
+ glog.Errorf("❌ DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err)
+ }
+ case *worker_pb.AdminMessage_TaskCancellation:
+ glog.Infof("🛑 TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId)
+ w.handleTaskCancellation(msg.TaskCancellation)
+ case *worker_pb.AdminMessage_AdminShutdown:
+ glog.Infof("🔄 ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id)
+ w.handleAdminShutdown(msg.AdminShutdown)
+ default:
+ glog.V(1).Infof("❓ UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message)
+ }
+}
+
+// handleTaskLogRequest processes task log requests from admin server
+func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) {
+ glog.V(1).Infof("Worker %s handling task log request for task %s", w.id, request.TaskId)
+
+ // Use the task log handler to process the request
+ response := w.taskLogHandler.HandleLogRequest(request)
+
+ // Send response back to admin server
+ responseMsg := &worker_pb.WorkerMessage{
+ WorkerId: w.id,
+ Timestamp: time.Now().Unix(),
+ Message: &worker_pb.WorkerMessage_TaskLogResponse{
+ TaskLogResponse: response,
+ },
+ }
+
+ grpcClient, ok := w.adminClient.(*GrpcAdminClient)
+ if !ok {
+ glog.Errorf("Cannot send task log response: admin client is not gRPC client")
+ return
+ }
+
+ select {
+ case grpcClient.outgoing <- responseMsg:
+ glog.V(1).Infof("Task log response sent for task %s", request.TaskId)
+ case <-time.After(5 * time.Second):
+ glog.Errorf("Failed to send task log response for task %s: timeout", request.TaskId)
+ }
+}
+
+// handleTaskCancellation processes task cancellation requests
+func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) {
+ glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId)
+
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ if task, exists := w.currentTasks[cancellation.TaskId]; exists {
+ // TODO: Implement task cancellation logic
+ glog.Infof("Cancelling task %s", task.ID)
+ } else {
+ glog.Warningf("Cannot cancel task %s: task not found", cancellation.TaskId)
+ }
+}
+
+// handleAdminShutdown processes admin shutdown notifications
+func (w *Worker) handleAdminShutdown(shutdown *worker_pb.AdminShutdown) {
+ glog.Infof("Worker %s received admin shutdown notification: %s", w.id, shutdown.Reason)
+
+ gracefulSeconds := shutdown.GracefulShutdownSeconds
+ if gracefulSeconds > 0 {
+ glog.Infof("Graceful shutdown in %d seconds", gracefulSeconds)
+ time.AfterFunc(time.Duration(gracefulSeconds)*time.Second, func() {
+ w.Stop()
+ })
+ } else {
+ // Immediate shutdown
+ go w.Stop()
+ }
+}
+
+// handleRegistrationResponse processes registration response from admin server
+func (w *Worker) handleRegistrationResponse(response *worker_pb.RegistrationResponse) {
+ glog.V(2).Infof("Worker %s processed registration response: success=%v", w.id, response.Success)
+ if !response.Success {
+ glog.Warningf("Worker %s registration failed: %s", w.id, response.Message)
+ }
+ // Registration responses are typically handled by the gRPC client during connection setup
+ // No additional action needed here
+}
+
+// handleHeartbeatResponse processes heartbeat response from admin server
+func (w *Worker) handleHeartbeatResponse(response *worker_pb.HeartbeatResponse) {
+ glog.V(4).Infof("Worker %s processed heartbeat response", w.id)
+ // Heartbeat responses are mainly for keeping the connection alive
+ // The admin may include configuration updates or status information in the future
+ // For now, just acknowledge receipt
+}