aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/balance/execution.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-01 11:18:32 -0700
committerGitHub <noreply@github.com>2025-08-01 11:18:32 -0700
commit0975968e71b05368d5f28f788cf863c2042c2696 (patch)
tree5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/balance/execution.go
parent1cba609bfa2306cc2885df212febd5ff954aa693 (diff)
downloadseaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz
seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip
admin: Refactor task destination planning (#7063)
* refactor planning into task detection * refactoring worker tasks * refactor * compiles, but only balance task is registered * compiles, but has nil exception * avoid nil logger * add back ec task * setting ec log directory * implement balance and vacuum tasks * EC tasks will no longer fail with "file not found" errors * Use ReceiveFile API to send locally generated shards * distributing shard files and ecx,ecj,vif files * generate .ecx files correctly * do not mount all possible EC shards (0-13) on every destination * use constants * delete all replicas * rename files * pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/balance/execution.go')
-rw-r--r--weed/worker/tasks/balance/execution.go156
1 files changed, 156 insertions, 0 deletions
diff --git a/weed/worker/tasks/balance/execution.go b/weed/worker/tasks/balance/execution.go
new file mode 100644
index 000000000..91cd912f0
--- /dev/null
+++ b/weed/worker/tasks/balance/execution.go
@@ -0,0 +1,156 @@
+package balance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TypedTask implements balance operation with typed protobuf parameters
+type TypedTask struct {
+ *base.BaseTypedTask
+
+ // Task state from protobuf
+ sourceServer string
+ destNode string
+ volumeID uint32
+ collection string
+ estimatedSize uint64
+ placementScore float64
+ forceMove bool
+ timeoutSeconds int32
+ placementConflicts []string
+}
+
+// NewTypedTask creates a new typed balance task
+func NewTypedTask() types.TypedTaskInterface {
+ task := &TypedTask{
+ BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance),
+ }
+ return task
+}
+
+// ValidateTyped validates the typed parameters for balance task
+func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
+ // Basic validation from base class
+ if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
+ return err
+ }
+
+ // Check that we have balance-specific parameters
+ balanceParams := params.GetBalanceParams()
+ if balanceParams == nil {
+ return fmt.Errorf("balance_params is required for balance task")
+ }
+
+ // Validate destination node
+ if balanceParams.DestNode == "" {
+ return fmt.Errorf("dest_node is required for balance task")
+ }
+
+ // Validate estimated size
+ if balanceParams.EstimatedSize == 0 {
+ return fmt.Errorf("estimated_size must be greater than 0")
+ }
+
+ // Validate timeout
+ if balanceParams.TimeoutSeconds <= 0 {
+ return fmt.Errorf("timeout_seconds must be greater than 0")
+ }
+
+ return nil
+}
+
+// EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters
+func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ balanceParams := params.GetBalanceParams()
+ if balanceParams != nil {
+ // Use the timeout from parameters if specified
+ if balanceParams.TimeoutSeconds > 0 {
+ return time.Duration(balanceParams.TimeoutSeconds) * time.Second
+ }
+
+ // Estimate based on volume size (1 minute per GB)
+ if balanceParams.EstimatedSize > 0 {
+ gbSize := balanceParams.EstimatedSize / (1024 * 1024 * 1024)
+ return time.Duration(gbSize) * time.Minute
+ }
+ }
+
+ // Default estimation
+ return 10 * time.Minute
+}
+
+// ExecuteTyped implements the balance operation with typed parameters
+func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
+ // Extract basic parameters
+ t.volumeID = params.VolumeId
+ t.sourceServer = params.Server
+ t.collection = params.Collection
+
+ // Extract balance-specific parameters
+ balanceParams := params.GetBalanceParams()
+ if balanceParams != nil {
+ t.destNode = balanceParams.DestNode
+ t.estimatedSize = balanceParams.EstimatedSize
+ t.placementScore = balanceParams.PlacementScore
+ t.forceMove = balanceParams.ForceMove
+ t.timeoutSeconds = balanceParams.TimeoutSeconds
+ t.placementConflicts = balanceParams.PlacementConflicts
+ }
+
+ glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)",
+ t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize)
+
+ // Log placement information
+ if t.placementScore > 0 {
+ glog.V(1).Infof("Placement score: %.2f", t.placementScore)
+ }
+ if len(t.placementConflicts) > 0 {
+ glog.V(1).Infof("Placement conflicts: %v", t.placementConflicts)
+ if !t.forceMove {
+ return fmt.Errorf("placement conflicts detected and force_move is false: %v", t.placementConflicts)
+ }
+ glog.Warningf("Proceeding with balance despite conflicts (force_move=true): %v", t.placementConflicts)
+ }
+
+ // Simulate balance operation with progress updates
+ steps := []struct {
+ name string
+ duration time.Duration
+ progress float64
+ }{
+ {"Analyzing cluster state", 2 * time.Second, 15},
+ {"Verifying destination capacity", 1 * time.Second, 25},
+ {"Starting volume migration", 1 * time.Second, 35},
+ {"Moving volume data", 6 * time.Second, 75},
+ {"Updating cluster metadata", 2 * time.Second, 95},
+ {"Verifying balance completion", 1 * time.Second, 100},
+ }
+
+ for _, step := range steps {
+ if t.IsCancelled() {
+ return fmt.Errorf("balance task cancelled during: %s", step.name)
+ }
+
+ glog.V(1).Infof("Balance task step: %s", step.name)
+ t.SetProgress(step.progress)
+
+ // Simulate work
+ time.Sleep(step.duration)
+ }
+
+ glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s",
+ t.volumeID, t.sourceServer, t.destNode)
+ return nil
+}
+
+// Register the typed task in the global registry
+func init() {
+ types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask)
+ glog.V(1).Infof("Registered typed balance task")
+}