aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_worker.go
blob: 8a87a8403bc4b80970c24f3175a2e192ac85e71f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
package maintenance

import (
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
	"github.com/seaweedfs/seaweedfs/weed/worker/types"

	// Import task packages to trigger their auto-registration
	_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
	_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
	_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)

// MaintenanceWorkerService manages maintenance task execution
// TaskExecutor defines the function signature for task execution
type TaskExecutor func(*MaintenanceWorkerService, *MaintenanceTask) error

// TaskExecutorFactory creates a task executor for a given worker service
type TaskExecutorFactory func() TaskExecutor

// Global registry for task executor factories
var taskExecutorFactories = make(map[MaintenanceTaskType]TaskExecutorFactory)
var executorRegistryMutex sync.RWMutex
var executorRegistryInitOnce sync.Once

// initializeExecutorFactories dynamically registers executor factories for all auto-registered task types
func initializeExecutorFactories() {
	executorRegistryInitOnce.Do(func() {
		// Get all registered task types from the global registry
		typesRegistry := tasks.GetGlobalTypesRegistry()

		var taskTypes []MaintenanceTaskType
		for workerTaskType := range typesRegistry.GetAllDetectors() {
			// Convert types.TaskType to MaintenanceTaskType by string conversion
			maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
			taskTypes = append(taskTypes, maintenanceTaskType)
		}

		// Register generic executor for all task types
		for _, taskType := range taskTypes {
			RegisterTaskExecutorFactory(taskType, createGenericTaskExecutor)
		}

		glog.V(1).Infof("Dynamically registered generic task executor for %d task types: %v", len(taskTypes), taskTypes)
	})
}

// RegisterTaskExecutorFactory registers a factory function for creating task executors
func RegisterTaskExecutorFactory(taskType MaintenanceTaskType, factory TaskExecutorFactory) {
	executorRegistryMutex.Lock()
	defer executorRegistryMutex.Unlock()
	taskExecutorFactories[taskType] = factory
	glog.V(2).Infof("Registered executor factory for task type: %s", taskType)
}

// GetTaskExecutorFactory returns the factory for a task type
func GetTaskExecutorFactory(taskType MaintenanceTaskType) (TaskExecutorFactory, bool) {
	// Ensure executor factories are initialized
	initializeExecutorFactories()

	executorRegistryMutex.RLock()
	defer executorRegistryMutex.RUnlock()
	factory, exists := taskExecutorFactories[taskType]
	return factory, exists
}

// GetSupportedExecutorTaskTypes returns all task types with registered executor factories
func GetSupportedExecutorTaskTypes() []MaintenanceTaskType {
	// Ensure executor factories are initialized
	initializeExecutorFactories()

	executorRegistryMutex.RLock()
	defer executorRegistryMutex.RUnlock()

	taskTypes := make([]MaintenanceTaskType, 0, len(taskExecutorFactories))
	for taskType := range taskExecutorFactories {
		taskTypes = append(taskTypes, taskType)
	}
	return taskTypes
}

// createGenericTaskExecutor creates a generic task executor that uses the task registry
func createGenericTaskExecutor() TaskExecutor {
	return func(mws *MaintenanceWorkerService, task *MaintenanceTask) error {
		return mws.executeGenericTask(task)
	}
}

// init does minimal initialization - actual registration happens lazily
func init() {
	// Executor factory registration will happen lazily when first accessed
	glog.V(1).Infof("Maintenance worker initialized - executor factories will be registered on first access")
}

type MaintenanceWorkerService struct {
	workerID      string
	address       string
	adminServer   string
	capabilities  []MaintenanceTaskType
	maxConcurrent int
	currentTasks  map[string]*MaintenanceTask
	queue         *MaintenanceQueue
	adminClient   AdminClient
	running       bool
	stopChan      chan struct{}

	// Task execution registry
	taskExecutors map[MaintenanceTaskType]TaskExecutor

	// Task registry for creating task instances
	taskRegistry *tasks.TaskRegistry
}

// NewMaintenanceWorkerService creates a new maintenance worker service
func NewMaintenanceWorkerService(workerID, address, adminServer string) *MaintenanceWorkerService {
	// Get all registered maintenance task types dynamically
	capabilities := GetRegisteredMaintenanceTaskTypes()

	worker := &MaintenanceWorkerService{
		workerID:      workerID,
		address:       address,
		adminServer:   adminServer,
		capabilities:  capabilities,
		maxConcurrent: 2, // Default concurrent task limit
		currentTasks:  make(map[string]*MaintenanceTask),
		stopChan:      make(chan struct{}),
		taskExecutors: make(map[MaintenanceTaskType]TaskExecutor),
		taskRegistry:  tasks.GetGlobalRegistry(), // Use global registry with auto-registered tasks
	}

	// Initialize task executor registry
	worker.initializeTaskExecutors()

	glog.V(1).Infof("Created maintenance worker with %d registered task types", len(worker.taskRegistry.GetSupportedTypes()))

	return worker
}

// executeGenericTask executes a task using the task registry instead of hardcoded methods
func (mws *MaintenanceWorkerService) executeGenericTask(task *MaintenanceTask) error {
	glog.V(2).Infof("Executing generic task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)

	// Convert MaintenanceTask to types.TaskType
	taskType := types.TaskType(string(task.Type))

	// Create task parameters
	taskParams := types.TaskParams{
		VolumeID:   task.VolumeID,
		Server:     task.Server,
		Collection: task.Collection,
		Parameters: task.Parameters,
	}

	// Create task instance using the registry
	taskInstance, err := mws.taskRegistry.CreateTask(taskType, taskParams)
	if err != nil {
		return fmt.Errorf("failed to create task instance: %v", err)
	}

	// Update progress to show task has started
	mws.updateTaskProgress(task.ID, 5)

	// Execute the task
	err = taskInstance.Execute(taskParams)
	if err != nil {
		return fmt.Errorf("task execution failed: %v", err)
	}

	// Update progress to show completion
	mws.updateTaskProgress(task.ID, 100)

	glog.V(2).Infof("Generic task %s completed successfully", task.ID)
	return nil
}

// initializeTaskExecutors sets up the task execution registry dynamically
func (mws *MaintenanceWorkerService) initializeTaskExecutors() {
	mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)

	// Get all registered executor factories and create executors
	executorRegistryMutex.RLock()
	defer executorRegistryMutex.RUnlock()

	for taskType, factory := range taskExecutorFactories {
		executor := factory()
		mws.taskExecutors[taskType] = executor
		glog.V(3).Infof("Initialized executor for task type: %s", taskType)
	}

	glog.V(2).Infof("Initialized %d task executors", len(mws.taskExecutors))
}

// RegisterTaskExecutor allows dynamic registration of new task executors
func (mws *MaintenanceWorkerService) RegisterTaskExecutor(taskType MaintenanceTaskType, executor TaskExecutor) {
	if mws.taskExecutors == nil {
		mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)
	}
	mws.taskExecutors[taskType] = executor
	glog.V(1).Infof("Registered executor for task type: %s", taskType)
}

// GetSupportedTaskTypes returns all task types that this worker can execute
func (mws *MaintenanceWorkerService) GetSupportedTaskTypes() []MaintenanceTaskType {
	return GetSupportedExecutorTaskTypes()
}

// Start begins the worker service
func (mws *MaintenanceWorkerService) Start() error {
	mws.running = true

	// Register with admin server
	worker := &MaintenanceWorker{
		ID:            mws.workerID,
		Address:       mws.address,
		Capabilities:  mws.capabilities,
		MaxConcurrent: mws.maxConcurrent,
	}

	if mws.queue != nil {
		mws.queue.RegisterWorker(worker)
	}

	// Start worker loop
	go mws.workerLoop()

	glog.Infof("Maintenance worker %s started at %s", mws.workerID, mws.address)
	return nil
}

// Stop terminates the worker service
func (mws *MaintenanceWorkerService) Stop() {
	mws.running = false
	close(mws.stopChan)

	// Wait for current tasks to complete or timeout
	timeout := time.NewTimer(30 * time.Second)
	defer timeout.Stop()

	for len(mws.currentTasks) > 0 {
		select {
		case <-timeout.C:
			glog.Warningf("Worker %s stopping with %d tasks still running", mws.workerID, len(mws.currentTasks))
			return
		case <-time.After(time.Second):
			// Check again
		}
	}

	glog.Infof("Maintenance worker %s stopped", mws.workerID)
}

// workerLoop is the main worker event loop
func (mws *MaintenanceWorkerService) workerLoop() {
	heartbeatTicker := time.NewTicker(30 * time.Second)
	defer heartbeatTicker.Stop()

	taskRequestTicker := time.NewTicker(5 * time.Second)
	defer taskRequestTicker.Stop()

	for mws.running {
		select {
		case <-mws.stopChan:
			return
		case <-heartbeatTicker.C:
			mws.sendHeartbeat()
		case <-taskRequestTicker.C:
			mws.requestTasks()
		}
	}
}

// sendHeartbeat sends heartbeat to admin server
func (mws *MaintenanceWorkerService) sendHeartbeat() {
	if mws.queue != nil {
		mws.queue.UpdateWorkerHeartbeat(mws.workerID)
	}
}

// requestTasks requests new tasks from the admin server
func (mws *MaintenanceWorkerService) requestTasks() {
	if len(mws.currentTasks) >= mws.maxConcurrent {
		return // Already at capacity
	}

	if mws.queue != nil {
		task := mws.queue.GetNextTask(mws.workerID, mws.capabilities)
		if task != nil {
			mws.executeTask(task)
		}
	}
}

// executeTask executes a maintenance task
func (mws *MaintenanceWorkerService) executeTask(task *MaintenanceTask) {
	mws.currentTasks[task.ID] = task

	go func() {
		defer func() {
			delete(mws.currentTasks, task.ID)
		}()

		glog.Infof("Worker %s executing task %s: %s", mws.workerID, task.ID, task.Type)

		// Execute task using dynamic executor registry
		var err error
		if executor, exists := mws.taskExecutors[task.Type]; exists {
			err = executor(mws, task)
		} else {
			err = fmt.Errorf("unsupported task type: %s", task.Type)
			glog.Errorf("No executor registered for task type: %s", task.Type)
		}

		// Report task completion
		if mws.queue != nil {
			errorMsg := ""
			if err != nil {
				errorMsg = err.Error()
			}
			mws.queue.CompleteTask(task.ID, errorMsg)
		}

		if err != nil {
			glog.Errorf("Worker %s failed to execute task %s: %v", mws.workerID, task.ID, err)
		} else {
			glog.Infof("Worker %s completed task %s successfully", mws.workerID, task.ID)
		}
	}()
}

// updateTaskProgress updates the progress of a task
func (mws *MaintenanceWorkerService) updateTaskProgress(taskID string, progress float64) {
	if mws.queue != nil {
		mws.queue.UpdateTaskProgress(taskID, progress)
	}
}

// GetStatus returns the current status of the worker
func (mws *MaintenanceWorkerService) GetStatus() map[string]interface{} {
	return map[string]interface{}{
		"worker_id":      mws.workerID,
		"address":        mws.address,
		"running":        mws.running,
		"capabilities":   mws.capabilities,
		"max_concurrent": mws.maxConcurrent,
		"current_tasks":  len(mws.currentTasks),
		"task_details":   mws.currentTasks,
	}
}

// SetQueue sets the maintenance queue for the worker
func (mws *MaintenanceWorkerService) SetQueue(queue *MaintenanceQueue) {
	mws.queue = queue
}

// SetAdminClient sets the admin client for the worker
func (mws *MaintenanceWorkerService) SetAdminClient(client AdminClient) {
	mws.adminClient = client
}

// SetCapabilities sets the worker capabilities
func (mws *MaintenanceWorkerService) SetCapabilities(capabilities []MaintenanceTaskType) {
	mws.capabilities = capabilities
}

// SetMaxConcurrent sets the maximum concurrent tasks
func (mws *MaintenanceWorkerService) SetMaxConcurrent(max int) {
	mws.maxConcurrent = max
}

// SetHeartbeatInterval sets the heartbeat interval (placeholder for future use)
func (mws *MaintenanceWorkerService) SetHeartbeatInterval(interval time.Duration) {
	// Future implementation for configurable heartbeat
}

// SetTaskRequestInterval sets the task request interval (placeholder for future use)
func (mws *MaintenanceWorkerService) SetTaskRequestInterval(interval time.Duration) {
	// Future implementation for configurable task requests
}

// MaintenanceWorkerCommand represents a standalone maintenance worker command
type MaintenanceWorkerCommand struct {
	workerService *MaintenanceWorkerService
}

// NewMaintenanceWorkerCommand creates a new worker command
func NewMaintenanceWorkerCommand(workerID, address, adminServer string) *MaintenanceWorkerCommand {
	return &MaintenanceWorkerCommand{
		workerService: NewMaintenanceWorkerService(workerID, address, adminServer),
	}
}

// Run starts the maintenance worker as a standalone service
func (mwc *MaintenanceWorkerCommand) Run() error {
	// Generate worker ID if not provided
	if mwc.workerService.workerID == "" {
		hostname, _ := os.Hostname()
		mwc.workerService.workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
	}

	// Start the worker service
	err := mwc.workerService.Start()
	if err != nil {
		return fmt.Errorf("failed to start maintenance worker: %v", err)
	}

	// Wait for interrupt signal
	select {}
}