aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/balance/balance.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/balance/balance.go')
-rw-r--r--weed/worker/tasks/balance/balance.go65
1 files changed, 62 insertions, 3 deletions
diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go
index ea867d950..0becb3415 100644
--- a/weed/worker/tasks/balance/balance.go
+++ b/weed/worker/tasks/balance/balance.go
@@ -1,6 +1,7 @@
package balance
import (
+ "context"
"fmt"
"time"
@@ -15,6 +16,9 @@ type Task struct {
server string
volumeID uint32
collection string
+
+ // Task parameters for accessing planned destinations
+ taskParams types.TaskParams
}
// NewTask creates a new balance task instance
@@ -30,7 +34,31 @@ func NewTask(server string, volumeID uint32, collection string) *Task {
// Execute executes the balance task
func (t *Task) Execute(params types.TaskParams) error {
- glog.Infof("Starting balance task for volume %d on server %s (collection: %s)", t.volumeID, t.server, t.collection)
+ // Use BaseTask.ExecuteTask to handle logging initialization
+ return t.ExecuteTask(context.Background(), params, t.executeImpl)
+}
+
+// executeImpl is the actual balance implementation
+func (t *Task) executeImpl(ctx context.Context, params types.TaskParams) error {
+ // Store task parameters for accessing planned destinations
+ t.taskParams = params
+
+ // Get planned destination
+ destNode := t.getPlannedDestination()
+ if destNode != "" {
+ t.LogWithFields("INFO", "Starting balance task with planned destination", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "source": t.server,
+ "destination": destNode,
+ "collection": t.collection,
+ })
+ } else {
+ t.LogWithFields("INFO", "Starting balance task without specific destination", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ })
+ }
// Simulate balance operation with progress updates
steps := []struct {
@@ -46,18 +74,36 @@ func (t *Task) Execute(params types.TaskParams) error {
}
for _, step := range steps {
+ select {
+ case <-ctx.Done():
+ t.LogWarning("Balance task cancelled during step: %s", step.name)
+ return ctx.Err()
+ default:
+ }
+
if t.IsCancelled() {
+ t.LogWarning("Balance task cancelled by request during step: %s", step.name)
return fmt.Errorf("balance task cancelled")
}
- glog.V(1).Infof("Balance task step: %s", step.name)
+ t.LogWithFields("INFO", "Executing balance step", map[string]interface{}{
+ "step": step.name,
+ "progress": step.progress,
+ "duration": step.duration.String(),
+ "volume_id": t.volumeID,
+ })
t.SetProgress(step.progress)
// Simulate work
time.Sleep(step.duration)
}
- glog.Infof("Balance task completed for volume %d on server %s", t.volumeID, t.server)
+ t.LogWithFields("INFO", "Balance task completed successfully", map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ "final_progress": 100.0,
+ })
return nil
}
@@ -72,6 +118,19 @@ func (t *Task) Validate(params types.TaskParams) error {
return nil
}
+// getPlannedDestination extracts the planned destination node from task parameters
+func (t *Task) getPlannedDestination() string {
+ if t.taskParams.TypedParams != nil {
+ if balanceParams := t.taskParams.TypedParams.GetBalanceParams(); balanceParams != nil {
+ if balanceParams.DestNode != "" {
+ glog.V(2).Infof("Found planned destination for volume %d: %s", t.volumeID, balanceParams.DestNode)
+ return balanceParams.DestNode
+ }
+ }
+ }
+ return ""
+}
+
// EstimateTime estimates the time needed for the task
func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
// Base time for balance operation