aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/balance/execution.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/balance/execution.go')
-rw-r--r--weed/worker/tasks/balance/execution.go72
1 files changed, 37 insertions, 35 deletions
diff --git a/weed/worker/tasks/balance/execution.go b/weed/worker/tasks/balance/execution.go
index 91cd912f0..0acd2b662 100644
--- a/weed/worker/tasks/balance/execution.go
+++ b/weed/worker/tasks/balance/execution.go
@@ -15,15 +15,13 @@ type TypedTask struct {
*base.BaseTypedTask
// Task state from protobuf
- sourceServer string
- destNode string
- volumeID uint32
- collection string
- estimatedSize uint64
- placementScore float64
- forceMove bool
- timeoutSeconds int32
- placementConflicts []string
+ sourceServer string
+ destNode string
+ volumeID uint32
+ collection string
+ estimatedSize uint64
+ forceMove bool
+ timeoutSeconds int32
}
// NewTypedTask creates a new typed balance task
@@ -47,14 +45,20 @@ func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
return fmt.Errorf("balance_params is required for balance task")
}
- // Validate destination node
- if balanceParams.DestNode == "" {
- return fmt.Errorf("dest_node is required for balance task")
+ // Validate sources and targets
+ if len(params.Sources) == 0 {
+ return fmt.Errorf("at least one source is required for balance task")
+ }
+ if len(params.Targets) == 0 {
+ return fmt.Errorf("at least one target is required for balance task")
}
- // Validate estimated size
- if balanceParams.EstimatedSize == 0 {
- return fmt.Errorf("estimated_size must be greater than 0")
+ // Validate that source and target have volume IDs
+ if params.Sources[0].VolumeId == 0 {
+ return fmt.Errorf("source volume_id is required for balance task")
+ }
+ if params.Targets[0].VolumeId == 0 {
+ return fmt.Errorf("target volume_id is required for balance task")
}
// Validate timeout
@@ -73,10 +77,13 @@ func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duratio
if balanceParams.TimeoutSeconds > 0 {
return time.Duration(balanceParams.TimeoutSeconds) * time.Second
}
+ }
- // Estimate based on volume size (1 minute per GB)
- if balanceParams.EstimatedSize > 0 {
- gbSize := balanceParams.EstimatedSize / (1024 * 1024 * 1024)
+ // Estimate based on volume size from sources (1 minute per GB)
+ if len(params.Sources) > 0 {
+ source := params.Sources[0]
+ if source.EstimatedSize > 0 {
+ gbSize := source.EstimatedSize / (1024 * 1024 * 1024)
return time.Duration(gbSize) * time.Minute
}
}
@@ -89,35 +96,30 @@ func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duratio
func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
// Extract basic parameters
t.volumeID = params.VolumeId
- t.sourceServer = params.Server
t.collection = params.Collection
+ // Ensure sources and targets are present (should be guaranteed by validation)
+ if len(params.Sources) == 0 {
+ return fmt.Errorf("at least one source is required for balance task (ExecuteTyped)")
+ }
+ if len(params.Targets) == 0 {
+ return fmt.Errorf("at least one target is required for balance task (ExecuteTyped)")
+ }
+
+ // Extract source and target information
+ t.sourceServer = params.Sources[0].Node
+ t.estimatedSize = params.Sources[0].EstimatedSize
+ t.destNode = params.Targets[0].Node
// Extract balance-specific parameters
balanceParams := params.GetBalanceParams()
if balanceParams != nil {
- t.destNode = balanceParams.DestNode
- t.estimatedSize = balanceParams.EstimatedSize
- t.placementScore = balanceParams.PlacementScore
t.forceMove = balanceParams.ForceMove
t.timeoutSeconds = balanceParams.TimeoutSeconds
- t.placementConflicts = balanceParams.PlacementConflicts
}
glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)",
t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize)
- // Log placement information
- if t.placementScore > 0 {
- glog.V(1).Infof("Placement score: %.2f", t.placementScore)
- }
- if len(t.placementConflicts) > 0 {
- glog.V(1).Infof("Placement conflicts: %v", t.placementConflicts)
- if !t.forceMove {
- return fmt.Errorf("placement conflicts detected and force_move is false: %v", t.placementConflicts)
- }
- glog.Warningf("Proceeding with balance despite conflicts (force_move=true): %v", t.placementConflicts)
- }
-
// Simulate balance operation with progress updates
steps := []struct {
name string