aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_manager.go
blob: 5d87d817ed282ad6215e160bba6c502147273c30 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
package maintenance

import (
	"fmt"
	"strings"
	"sync"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
)

// MaintenanceManager coordinates the maintenance system
type MaintenanceManager struct {
	config      *MaintenanceConfig
	scanner     *MaintenanceScanner
	queue       *MaintenanceQueue
	adminClient AdminClient
	running     bool
	stopChan    chan struct{}
	// Error handling and backoff
	errorCount    int
	lastError     error
	lastErrorTime time.Time
	backoffDelay  time.Duration
	mutex         sync.RWMutex
}

// NewMaintenanceManager creates a new maintenance manager
func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *MaintenanceManager {
	if config == nil {
		config = DefaultMaintenanceConfig()
	}

	queue := NewMaintenanceQueue(config.Policy)
	scanner := NewMaintenanceScanner(adminClient, config.Policy, queue)

	return &MaintenanceManager{
		config:       config,
		scanner:      scanner,
		queue:        queue,
		adminClient:  adminClient,
		stopChan:     make(chan struct{}),
		backoffDelay: time.Second, // Start with 1 second backoff
	}
}

// Start begins the maintenance manager
func (mm *MaintenanceManager) Start() error {
	if !mm.config.Enabled {
		glog.V(1).Infof("Maintenance system is disabled")
		return nil
	}

	// Validate configuration durations to prevent ticker panics
	if err := mm.validateConfig(); err != nil {
		return fmt.Errorf("invalid maintenance configuration: %w", err)
	}

	mm.running = true

	// Start background processes
	go mm.scanLoop()
	go mm.cleanupLoop()

	glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds)
	return nil
}

// validateConfig validates the maintenance configuration durations
func (mm *MaintenanceManager) validateConfig() error {
	if mm.config.ScanIntervalSeconds <= 0 {
		glog.Warningf("Invalid scan interval %ds, using default 30m", mm.config.ScanIntervalSeconds)
		mm.config.ScanIntervalSeconds = 30 * 60 // 30 minutes in seconds
	}

	if mm.config.CleanupIntervalSeconds <= 0 {
		glog.Warningf("Invalid cleanup interval %ds, using default 24h", mm.config.CleanupIntervalSeconds)
		mm.config.CleanupIntervalSeconds = 24 * 60 * 60 // 24 hours in seconds
	}

	if mm.config.WorkerTimeoutSeconds <= 0 {
		glog.Warningf("Invalid worker timeout %ds, using default 5m", mm.config.WorkerTimeoutSeconds)
		mm.config.WorkerTimeoutSeconds = 5 * 60 // 5 minutes in seconds
	}

	if mm.config.TaskTimeoutSeconds <= 0 {
		glog.Warningf("Invalid task timeout %ds, using default 2h", mm.config.TaskTimeoutSeconds)
		mm.config.TaskTimeoutSeconds = 2 * 60 * 60 // 2 hours in seconds
	}

	if mm.config.RetryDelaySeconds <= 0 {
		glog.Warningf("Invalid retry delay %ds, using default 15m", mm.config.RetryDelaySeconds)
		mm.config.RetryDelaySeconds = 15 * 60 // 15 minutes in seconds
	}

	if mm.config.TaskRetentionSeconds <= 0 {
		glog.Warningf("Invalid task retention %ds, using default 168h", mm.config.TaskRetentionSeconds)
		mm.config.TaskRetentionSeconds = 7 * 24 * 60 * 60 // 7 days in seconds
	}

	return nil
}

// IsRunning returns whether the maintenance manager is currently running
func (mm *MaintenanceManager) IsRunning() bool {
	return mm.running
}

// Stop terminates the maintenance manager
func (mm *MaintenanceManager) Stop() {
	mm.running = false
	close(mm.stopChan)
	glog.Infof("Maintenance manager stopped")
}

// scanLoop periodically scans for maintenance tasks with adaptive timing
func (mm *MaintenanceManager) scanLoop() {
	scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
	ticker := time.NewTicker(scanInterval)
	defer ticker.Stop()

	for mm.running {
		select {
		case <-mm.stopChan:
			return
		case <-ticker.C:
			glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
			mm.performScan()

			// Adjust ticker interval based on error state
			mm.mutex.RLock()
			currentInterval := scanInterval
			if mm.errorCount > 0 {
				// Use backoff delay when there are errors
				currentInterval = mm.backoffDelay
				if currentInterval > scanInterval {
					// Don't make it longer than the configured interval * 10
					maxInterval := scanInterval * 10
					if currentInterval > maxInterval {
						currentInterval = maxInterval
					}
				}
			}
			mm.mutex.RUnlock()

			// Reset ticker with new interval if needed
			if currentInterval != scanInterval {
				ticker.Stop()
				ticker = time.NewTicker(currentInterval)
			}
		}
	}
}

// cleanupLoop periodically cleans up old tasks and stale workers
func (mm *MaintenanceManager) cleanupLoop() {
	cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
	ticker := time.NewTicker(cleanupInterval)
	defer ticker.Stop()

	for mm.running {
		select {
		case <-mm.stopChan:
			return
		case <-ticker.C:
			mm.performCleanup()
		}
	}
}

// performScan executes a maintenance scan with error handling and backoff
func (mm *MaintenanceManager) performScan() {
	mm.mutex.Lock()
	defer mm.mutex.Unlock()

	glog.V(2).Infof("Starting maintenance scan")

	results, err := mm.scanner.ScanForMaintenanceTasks()
	if err != nil {
		mm.handleScanError(err)
		return
	}

	// Scan succeeded, reset error tracking
	mm.resetErrorTracking()

	if len(results) > 0 {
		mm.queue.AddTasksFromResults(results)
		glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results))
	} else {
		glog.V(2).Infof("Maintenance scan completed: no tasks needed")
	}
}

// handleScanError handles scan errors with exponential backoff and reduced logging
func (mm *MaintenanceManager) handleScanError(err error) {
	now := time.Now()
	mm.errorCount++
	mm.lastError = err
	mm.lastErrorTime = now

	// Use exponential backoff with jitter
	if mm.errorCount > 1 {
		mm.backoffDelay = mm.backoffDelay * 2
		if mm.backoffDelay > 5*time.Minute {
			mm.backoffDelay = 5 * time.Minute // Cap at 5 minutes
		}
	}

	// Reduce log frequency based on error count and time
	shouldLog := false
	if mm.errorCount <= 3 {
		// Log first 3 errors immediately
		shouldLog = true
	} else if mm.errorCount <= 10 && mm.errorCount%3 == 0 {
		// Log every 3rd error for errors 4-10
		shouldLog = true
	} else if mm.errorCount%10 == 0 {
		// Log every 10th error after that
		shouldLog = true
	}

	if shouldLog {
		// Check if it's a connection error to provide better messaging
		if isConnectionError(err) {
			if mm.errorCount == 1 {
				glog.Errorf("Maintenance scan failed: %v (will retry with backoff)", err)
			} else {
				glog.Errorf("Maintenance scan still failing after %d attempts: %v (backoff: %v)",
					mm.errorCount, err, mm.backoffDelay)
			}
		} else {
			glog.Errorf("Maintenance scan failed: %v", err)
		}
	} else {
		// Use debug level for suppressed errors
		glog.V(3).Infof("Maintenance scan failed (error #%d, suppressed): %v", mm.errorCount, err)
	}
}

// resetErrorTracking resets error tracking when scan succeeds
func (mm *MaintenanceManager) resetErrorTracking() {
	if mm.errorCount > 0 {
		glog.V(1).Infof("Maintenance scan recovered after %d failed attempts", mm.errorCount)
		mm.errorCount = 0
		mm.lastError = nil
		mm.backoffDelay = time.Second // Reset to initial delay
	}
}

// isConnectionError checks if the error is a connection-related error
func isConnectionError(err error) bool {
	if err == nil {
		return false
	}
	errStr := err.Error()
	return strings.Contains(errStr, "connection refused") ||
		strings.Contains(errStr, "connection error") ||
		strings.Contains(errStr, "dial tcp") ||
		strings.Contains(errStr, "connection timeout") ||
		strings.Contains(errStr, "no route to host") ||
		strings.Contains(errStr, "network unreachable")
}

// performCleanup cleans up old tasks and stale workers
func (mm *MaintenanceManager) performCleanup() {
	glog.V(2).Infof("Starting maintenance cleanup")

	taskRetention := time.Duration(mm.config.TaskRetentionSeconds) * time.Second
	workerTimeout := time.Duration(mm.config.WorkerTimeoutSeconds) * time.Second

	removedTasks := mm.queue.CleanupOldTasks(taskRetention)
	removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)

	if removedTasks > 0 || removedWorkers > 0 {
		glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers)
	}
}

// GetQueue returns the maintenance queue
func (mm *MaintenanceManager) GetQueue() *MaintenanceQueue {
	return mm.queue
}

// GetConfig returns the maintenance configuration
func (mm *MaintenanceManager) GetConfig() *MaintenanceConfig {
	return mm.config
}

// GetStats returns maintenance statistics
func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
	stats := mm.queue.GetStats()

	mm.mutex.RLock()
	defer mm.mutex.RUnlock()

	stats.LastScanTime = time.Now() // Would need to track this properly

	// Calculate next scan time based on current error state
	scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
	nextScanInterval := scanInterval
	if mm.errorCount > 0 {
		nextScanInterval = mm.backoffDelay
		maxInterval := scanInterval * 10
		if nextScanInterval > maxInterval {
			nextScanInterval = maxInterval
		}
	}
	stats.NextScanTime = time.Now().Add(nextScanInterval)

	return stats
}

// GetErrorState returns the current error state for monitoring
func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
	mm.mutex.RLock()
	defer mm.mutex.RUnlock()
	return mm.errorCount, mm.lastError, mm.backoffDelay
}

// GetTasks returns tasks with filtering
func (mm *MaintenanceManager) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
	return mm.queue.GetTasks(status, taskType, limit)
}

// GetWorkers returns all registered workers
func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
	return mm.queue.GetWorkers()
}

// TriggerScan manually triggers a maintenance scan
func (mm *MaintenanceManager) TriggerScan() error {
	if !mm.running {
		return fmt.Errorf("maintenance manager is not running")
	}

	go mm.performScan()
	return nil
}

// UpdateConfig updates the maintenance configuration
func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error {
	if config == nil {
		return fmt.Errorf("config cannot be nil")
	}

	mm.config = config
	mm.queue.policy = config.Policy
	mm.scanner.policy = config.Policy

	glog.V(1).Infof("Maintenance configuration updated")
	return nil
}

// CancelTask cancels a pending task
func (mm *MaintenanceManager) CancelTask(taskID string) error {
	mm.queue.mutex.Lock()
	defer mm.queue.mutex.Unlock()

	task, exists := mm.queue.tasks[taskID]
	if !exists {
		return fmt.Errorf("task %s not found", taskID)
	}

	if task.Status == TaskStatusPending {
		task.Status = TaskStatusCancelled
		task.CompletedAt = &[]time.Time{time.Now()}[0]

		// Remove from pending tasks
		for i, pendingTask := range mm.queue.pendingTasks {
			if pendingTask.ID == taskID {
				mm.queue.pendingTasks = append(mm.queue.pendingTasks[:i], mm.queue.pendingTasks[i+1:]...)
				break
			}
		}

		glog.V(2).Infof("Cancelled task %s", taskID)
		return nil
	}

	return fmt.Errorf("task %s cannot be cancelled (status: %s)", taskID, task.Status)
}

// RegisterWorker registers a new worker
func (mm *MaintenanceManager) RegisterWorker(worker *MaintenanceWorker) {
	mm.queue.RegisterWorker(worker)
}

// GetNextTask returns the next task for a worker
func (mm *MaintenanceManager) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
	return mm.queue.GetNextTask(workerID, capabilities)
}

// CompleteTask marks a task as completed
func (mm *MaintenanceManager) CompleteTask(taskID string, error string) {
	mm.queue.CompleteTask(taskID, error)
}

// UpdateTaskProgress updates task progress
func (mm *MaintenanceManager) UpdateTaskProgress(taskID string, progress float64) {
	mm.queue.UpdateTaskProgress(taskID, progress)
}

// UpdateWorkerHeartbeat updates worker heartbeat
func (mm *MaintenanceManager) UpdateWorkerHeartbeat(workerID string) {
	mm.queue.UpdateWorkerHeartbeat(workerID)
}