diff options
Diffstat (limited to 'weed/worker/tasks/balance/balance.go')
| -rw-r--r-- | weed/worker/tasks/balance/balance.go | 65 |
1 files changed, 62 insertions, 3 deletions
diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go index ea867d950..0becb3415 100644 --- a/weed/worker/tasks/balance/balance.go +++ b/weed/worker/tasks/balance/balance.go @@ -1,6 +1,7 @@ package balance import ( + "context" "fmt" "time" @@ -15,6 +16,9 @@ type Task struct { server string volumeID uint32 collection string + + // Task parameters for accessing planned destinations + taskParams types.TaskParams } // NewTask creates a new balance task instance @@ -30,7 +34,31 @@ func NewTask(server string, volumeID uint32, collection string) *Task { // Execute executes the balance task func (t *Task) Execute(params types.TaskParams) error { - glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection) + // Use BaseTask.ExecuteTask to handle logging initialization + return t.ExecuteTask(context.Background(), params, t.executeImpl) +} + +// executeImpl is the actual balance implementation +func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error { + // Store task parameters for accessing planned destinations + t.taskParams = params + + // Get planned destination + destNode := t.getPlannedDestination() + if destNode != "" { + t.LogWithFields("INFO", "Starting balance task with planned destination", map[string]interface{}{ + "volume_id": t.volumeID, + "source": t.server, + "destination": destNode, + "collection": t.collection, + }) + } else { + t.LogWithFields("INFO", "Starting balance task without specific destination", map[string]interface{}{ + "volume_id": t.volumeID, + "server": t.server, + "collection": t.collection, + }) + } // Simulate balance operation with progress updates steps := []struct { @@ -46,18 +74,36 @@ func (t *Task) Execute(params types.TaskParams) error { } for _, step := range steps { + select { + case <-ctx.Done(): + t.LogWarning("Balance task cancelled during step: %s", step.name) + return ctx.Err() + default: + } + if t.IsCancelled() { + t.LogWarning("Balance task cancelled by request during step: %s", step.name) return fmt.Errorf("balance task cancelled") } - glog.V(1).Infof("Balance task step: %s", step.name) + t.LogWithFields("INFO", "Executing balance step", map[string]interface{}{ + "step": step.name, + "progress": step.progress, + "duration": step.duration.String(), + "volume_id": t.volumeID, + }) t.SetProgress(step.progress) // Simulate work time.Sleep(step.duration) } - glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server) + t.LogWithFields("INFO", "Balance task completed successfully", map[string]interface{}{ + "volume_id": t.volumeID, + "server": t.server, + "collection": t.collection, + "final_progress": 100.0, + }) return nil } @@ -72,6 +118,19 @@ func (t *Task) Validate(params types.TaskParams) error { return nil } +// getPlannedDestination extracts the planned destination node from task parameters +func (t *Task) getPlannedDestination() string { + if t.taskParams.TypedParams != nil { + if balanceParams := t.taskParams.TypedParams.GetBalanceParams(); balanceParams != nil { + if balanceParams.DestNode != "" { + glog.V(2).Infof("Found planned destination for volume %d: %s", t.volumeID, balanceParams.DestNode) + return balanceParams.DestNode + } + } + } + return "" +} + // EstimateTime estimates the time needed for the task func (t *Task) EstimateTime(params types.TaskParams) time.Duration { // Base time for balance operation |
