aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/balance/execution.go
blob: 0acd2b662030443bb4d0c7828ff8846e5e488ecb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package balance

import (
	"fmt"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
	"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
	"github.com/seaweedfs/seaweedfs/weed/worker/types"
)

// TypedTask implements balance operation with typed protobuf parameters
type TypedTask struct {
	*base.BaseTypedTask

	// Task state from protobuf
	sourceServer   string
	destNode       string
	volumeID       uint32
	collection     string
	estimatedSize  uint64
	forceMove      bool
	timeoutSeconds int32
}

// NewTypedTask creates a new typed balance task
func NewTypedTask() types.TypedTaskInterface {
	task := &TypedTask{
		BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance),
	}
	return task
}

// ValidateTyped validates the typed parameters for balance task
func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
	// Basic validation from base class
	if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
		return err
	}

	// Check that we have balance-specific parameters
	balanceParams := params.GetBalanceParams()
	if balanceParams == nil {
		return fmt.Errorf("balance_params is required for balance task")
	}

	// Validate sources and targets
	if len(params.Sources) == 0 {
		return fmt.Errorf("at least one source is required for balance task")
	}
	if len(params.Targets) == 0 {
		return fmt.Errorf("at least one target is required for balance task")
	}

	// Validate that source and target have volume IDs
	if params.Sources[0].VolumeId == 0 {
		return fmt.Errorf("source volume_id is required for balance task")
	}
	if params.Targets[0].VolumeId == 0 {
		return fmt.Errorf("target volume_id is required for balance task")
	}

	// Validate timeout
	if balanceParams.TimeoutSeconds <= 0 {
		return fmt.Errorf("timeout_seconds must be greater than 0")
	}

	return nil
}

// EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters
func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
	balanceParams := params.GetBalanceParams()
	if balanceParams != nil {
		// Use the timeout from parameters if specified
		if balanceParams.TimeoutSeconds > 0 {
			return time.Duration(balanceParams.TimeoutSeconds) * time.Second
		}
	}

	// Estimate based on volume size from sources (1 minute per GB)
	if len(params.Sources) > 0 {
		source := params.Sources[0]
		if source.EstimatedSize > 0 {
			gbSize := source.EstimatedSize / (1024 * 1024 * 1024)
			return time.Duration(gbSize) * time.Minute
		}
	}

	// Default estimation
	return 10 * time.Minute
}

// ExecuteTyped implements the balance operation with typed parameters
func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
	// Extract basic parameters
	t.volumeID = params.VolumeId
	t.collection = params.Collection

	// Ensure sources and targets are present (should be guaranteed by validation)
	if len(params.Sources) == 0 {
		return fmt.Errorf("at least one source is required for balance task (ExecuteTyped)")
	}
	if len(params.Targets) == 0 {
		return fmt.Errorf("at least one target is required for balance task (ExecuteTyped)")
	}

	// Extract source and target information
	t.sourceServer = params.Sources[0].Node
	t.estimatedSize = params.Sources[0].EstimatedSize
	t.destNode = params.Targets[0].Node
	// Extract balance-specific parameters
	balanceParams := params.GetBalanceParams()
	if balanceParams != nil {
		t.forceMove = balanceParams.ForceMove
		t.timeoutSeconds = balanceParams.TimeoutSeconds
	}

	glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)",
		t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize)

	// Simulate balance operation with progress updates
	steps := []struct {
		name     string
		duration time.Duration
		progress float64
	}{
		{"Analyzing cluster state", 2 * time.Second, 15},
		{"Verifying destination capacity", 1 * time.Second, 25},
		{"Starting volume migration", 1 * time.Second, 35},
		{"Moving volume data", 6 * time.Second, 75},
		{"Updating cluster metadata", 2 * time.Second, 95},
		{"Verifying balance completion", 1 * time.Second, 100},
	}

	for _, step := range steps {
		if t.IsCancelled() {
			return fmt.Errorf("balance task cancelled during: %s", step.name)
		}

		glog.V(1).Infof("Balance task step: %s", step.name)
		t.SetProgress(step.progress)

		// Simulate work
		time.Sleep(step.duration)
	}

	glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s",
		t.volumeID, t.sourceServer, t.destNode)
	return nil
}

// Register the typed task in the global registry
func init() {
	types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask)
	glog.V(1).Infof("Registered typed balance task")
}