diff options
Diffstat (limited to 'weed/worker/tasks/balance/execution.go')
| -rw-r--r-- | weed/worker/tasks/balance/execution.go | 72 |
1 files changed, 37 insertions, 35 deletions
diff --git a/weed/worker/tasks/balance/execution.go b/weed/worker/tasks/balance/execution.go index 91cd912f0..0acd2b662 100644 --- a/weed/worker/tasks/balance/execution.go +++ b/weed/worker/tasks/balance/execution.go @@ -15,15 +15,13 @@ type TypedTask struct { *base.BaseTypedTask // Task state from protobuf - sourceServer string - destNode string - volumeID uint32 - collection string - estimatedSize uint64 - placementScore float64 - forceMove bool - timeoutSeconds int32 - placementConflicts []string + sourceServer string + destNode string + volumeID uint32 + collection string + estimatedSize uint64 + forceMove bool + timeoutSeconds int32 } // NewTypedTask creates a new typed balance task @@ -47,14 +45,20 @@ func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error { return fmt.Errorf("balance_params is required for balance task") } - // Validate destination node - if balanceParams.DestNode == "" { - return fmt.Errorf("dest_node is required for balance task") + // Validate sources and targets + if len(params.Sources) == 0 { + return fmt.Errorf("at least one source is required for balance task") + } + if len(params.Targets) == 0 { + return fmt.Errorf("at least one target is required for balance task") } - // Validate estimated size - if balanceParams.EstimatedSize == 0 { - return fmt.Errorf("estimated_size must be greater than 0") + // Validate that source and target have volume IDs + if params.Sources[0].VolumeId == 0 { + return fmt.Errorf("source volume_id is required for balance task") + } + if params.Targets[0].VolumeId == 0 { + return fmt.Errorf("target volume_id is required for balance task") } // Validate timeout @@ -73,10 +77,13 @@ func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duratio if balanceParams.TimeoutSeconds > 0 { return time.Duration(balanceParams.TimeoutSeconds) * time.Second } + } - // Estimate based on volume size (1 minute per GB) - if balanceParams.EstimatedSize > 0 { - gbSize := balanceParams.EstimatedSize / (1024 * 1024 * 1024) + // Estimate based on volume size from sources (1 minute per GB) + if len(params.Sources) > 0 { + source := params.Sources[0] + if source.EstimatedSize > 0 { + gbSize := source.EstimatedSize / (1024 * 1024 * 1024) return time.Duration(gbSize) * time.Minute } } @@ -89,35 +96,30 @@ func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duratio func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error { // Extract basic parameters t.volumeID = params.VolumeId - t.sourceServer = params.Server t.collection = params.Collection + // Ensure sources and targets are present (should be guaranteed by validation) + if len(params.Sources) == 0 { + return fmt.Errorf("at least one source is required for balance task (ExecuteTyped)") + } + if len(params.Targets) == 0 { + return fmt.Errorf("at least one target is required for balance task (ExecuteTyped)") + } + + // Extract source and target information + t.sourceServer = params.Sources[0].Node + t.estimatedSize = params.Sources[0].EstimatedSize + t.destNode = params.Targets[0].Node // Extract balance-specific parameters balanceParams := params.GetBalanceParams() if balanceParams != nil { - t.destNode = balanceParams.DestNode - t.estimatedSize = balanceParams.EstimatedSize - t.placementScore = balanceParams.PlacementScore t.forceMove = balanceParams.ForceMove t.timeoutSeconds = balanceParams.TimeoutSeconds - t.placementConflicts = balanceParams.PlacementConflicts } glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)", t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize) - // Log placement information - if t.placementScore > 0 { - glog.V(1).Infof("Placement score: %.2f", t.placementScore) - } - if len(t.placementConflicts) > 0 { - glog.V(1).Infof("Placement conflicts: %v", t.placementConflicts) - if !t.forceMove { - return fmt.Errorf("placement conflicts detected and force_move is false: %v", t.placementConflicts) - } - glog.Warningf("Proceeding with balance despite conflicts (force_move=true): %v", t.placementConflicts) - } - // Simulate balance operation with progress updates steps := []struct { name string |
