blob: d164467c3c31e83daf52e9c8afac1c10b8f009a2 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
|
# SeaweedFS Task Distribution System Design
## Overview
This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.
## System Architecture
### High-Level Components
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Master │◄──►│ Admin Server │◄──►│ Workers │
│ │ │ │ │ │
│ - Volume Info │ │ - Task Discovery │ │ - Task Exec │
│ - Shard Status │ │ - Task Assign │ │ - Progress │
│ - Heartbeats │ │ - Progress Track │ │ - Error Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Volume Servers │ │ Volume Monitor │ │ Task Execution │
│ │ │ │ │ │
│ - Store Volumes │ │ - Health Check │ │ - EC Convert │
│ - EC Shards │ │ - Usage Stats │ │ - Vacuum Clean │
│ - Report Status │ │ - State Sync │ │ - Status Report │
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
## 1. Admin Server Design
### 1.1 Core Responsibilities
- **Task Discovery**: Scan volumes to identify EC and vacuum candidates
- **Worker Management**: Track available workers and their capabilities
- **Task Assignment**: Match tasks to optimal workers
- **Progress Tracking**: Monitor in-progress tasks for capacity planning
- **State Reconciliation**: Sync with master server for volume state updates
### 1.2 Task Discovery Engine
```go
type TaskDiscoveryEngine struct {
masterClient MasterClient
volumeScanner VolumeScanner
taskDetectors map[TaskType]TaskDetector
scanInterval time.Duration
}
type VolumeCandidate struct {
VolumeID uint32
Server string
Collection string
TaskType TaskType
Priority TaskPriority
Reason string
DetectedAt time.Time
Parameters map[string]interface{}
}
```
**EC Detection Logic**:
- Find volumes >= 95% full and idle for > 1 hour
- Exclude volumes already in EC format
- Exclude volumes with ongoing operations
- Prioritize by collection and age
**Vacuum Detection Logic**:
- Find volumes with garbage ratio > 30%
- Exclude read-only volumes
- Exclude volumes with recent vacuum operations
- Prioritize by garbage percentage
### 1.3 Worker Registry & Management
```go
type WorkerRegistry struct {
workers map[string]*Worker
capabilities map[TaskType][]*Worker
lastHeartbeat map[string]time.Time
taskAssignment map[string]*Task
mutex sync.RWMutex
}
type Worker struct {
ID string
Address string
Capabilities []TaskType
MaxConcurrent int
CurrentLoad int
Status WorkerStatus
LastSeen time.Time
Performance WorkerMetrics
}
```
### 1.4 Task Assignment Algorithm
```go
type TaskScheduler struct {
registry *WorkerRegistry
taskQueue *PriorityQueue
inProgressTasks map[string]*InProgressTask
volumeReservations map[uint32]*VolumeReservation
}
// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)
```
## 2. Worker Process Design
### 2.1 Worker Architecture
```go
type MaintenanceWorker struct {
id string
config *WorkerConfig
adminClient AdminClient
taskExecutors map[TaskType]TaskExecutor
currentTasks map[string]*RunningTask
registry *TaskRegistry
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
}
```
### 2.2 Task Execution Framework
```go
type TaskExecutor interface {
Execute(ctx context.Context, task *Task) error
EstimateTime(task *Task) time.Duration
ValidateResources(task *Task) error
GetProgress() float64
Cancel() error
}
type ErasureCodingExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
type VacuumExecutor struct {
volumeClient VolumeServerClient
progress float64
cancelled bool
}
```
### 2.3 Worker Capabilities & Registration
```go
type WorkerCapabilities struct {
SupportedTasks []TaskType
MaxConcurrent int
ResourceLimits ResourceLimits
PreferredServers []string // Affinity for specific volume servers
}
type ResourceLimits struct {
MaxMemoryMB int64
MaxDiskSpaceMB int64
MaxNetworkMbps int64
MaxCPUPercent float64
}
```
## 3. Task Lifecycle Management
### 3.1 Task States
```go
type TaskState string
const (
TaskStatePending TaskState = "pending"
TaskStateAssigned TaskState = "assigned"
TaskStateInProgress TaskState = "in_progress"
TaskStateCompleted TaskState = "completed"
TaskStateFailed TaskState = "failed"
TaskStateCancelled TaskState = "cancelled"
TaskStateStuck TaskState = "stuck" // Taking too long
TaskStateDuplicate TaskState = "duplicate" // Detected duplicate
)
```
### 3.2 Progress Tracking & Monitoring
```go
type InProgressTask struct {
Task *Task
WorkerID string
StartedAt time.Time
LastUpdate time.Time
Progress float64
EstimatedEnd time.Time
VolumeReserved bool // Reserved for capacity planning
}
type TaskMonitor struct {
inProgressTasks map[string]*InProgressTask
timeoutChecker *time.Ticker
stuckDetector *time.Ticker
duplicateChecker *time.Ticker
}
```
## 4. Volume Capacity Reconciliation
### 4.1 Volume State Tracking
```go
type VolumeStateManager struct {
masterClient MasterClient
inProgressTasks map[uint32]*InProgressTask // VolumeID -> Task
committedChanges map[uint32]*VolumeChange // Changes not yet in master
reconcileInterval time.Duration
}
type VolumeChange struct {
VolumeID uint32
ChangeType ChangeType // "ec_encoding", "vacuum_completed"
OldCapacity int64
NewCapacity int64
TaskID string
CompletedAt time.Time
ReportedToMaster bool
}
```
### 4.2 Shard Assignment Integration
When the master needs to assign shards, it must consider:
1. **Current volume state** from its own records
2. **In-progress capacity changes** from admin server
3. **Committed but unreported changes** from admin server
```go
type CapacityOracle struct {
adminServer AdminServerClient
masterState *MasterVolumeState
updateFreq time.Duration
}
func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
baseCapacity := o.masterState.GetCapacity(volumeID)
// Adjust for in-progress tasks
if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
switch task.Type {
case TaskTypeErasureCoding:
// EC reduces effective capacity
return baseCapacity / 2 // Simplified
case TaskTypeVacuum:
// Vacuum may increase available space
return baseCapacity + int64(float64(baseCapacity) * 0.3)
}
}
// Adjust for completed but unreported changes
if change := o.adminServer.GetPendingChange(volumeID); change != nil {
return change.NewCapacity
}
return baseCapacity
}
```
## 5. Error Handling & Recovery
### 5.1 Worker Failure Scenarios
```go
type FailureHandler struct {
taskRescheduler *TaskRescheduler
workerMonitor *WorkerMonitor
alertManager *AlertManager
}
// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion
```
### 5.2 Recovery Strategies
**Worker Timeout Recovery**:
- Mark worker as inactive after 3 missed heartbeats
- Reschedule all assigned tasks to other workers
- Cleanup any partial state
**Task Stuck Recovery**:
- Detect tasks with no progress for > 2x estimated time
- Cancel stuck task and mark volume for cleanup
- Reschedule if retry count < max_retries
**Duplicate Task Prevention**:
```go
type DuplicateDetector struct {
activeFingerprints map[string]bool // VolumeID+TaskType
recentCompleted *LRUCache // Recently completed tasks
}
func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
return d.activeFingerprints[fingerprint] ||
d.recentCompleted.Contains(fingerprint)
}
```
## 6. Simulation & Testing Framework
### 6.1 Failure Simulation
```go
type TaskSimulator struct {
scenarios map[string]SimulationScenario
}
type SimulationScenario struct {
Name string
WorkerCount int
VolumeCount int
FailurePatterns []FailurePattern
Duration time.Duration
}
type FailurePattern struct {
Type FailureType // "worker_timeout", "task_stuck", "duplicate"
Probability float64 // 0.0 to 1.0
Timing TimingSpec // When during task execution
Duration time.Duration
}
```
### 6.2 Test Scenarios
**Scenario 1: Worker Timeout During EC**
- Start EC task on 30GB volume
- Kill worker at 50% progress
- Verify task reassignment
- Verify no duplicate EC operations
**Scenario 2: Stuck Vacuum Task**
- Start vacuum on high-garbage volume
- Simulate worker hanging at 75% progress
- Verify timeout detection and cleanup
- Verify volume state consistency
**Scenario 3: Duplicate Task Prevention**
- Submit same EC task from multiple sources
- Verify only one task executes
- Verify proper conflict resolution
**Scenario 4: Master-Admin State Divergence**
- Create in-progress EC task
- Simulate master restart
- Verify state reconciliation
- Verify shard assignment accounts for in-progress work
## 7. Performance & Scalability
### 7.1 Metrics & Monitoring
```go
type SystemMetrics struct {
TasksPerSecond float64
WorkerUtilization float64
AverageTaskTime time.Duration
FailureRate float64
QueueDepth int
VolumeStatesSync bool
}
```
### 7.2 Scalability Considerations
- **Horizontal Worker Scaling**: Add workers without admin server changes
- **Admin Server HA**: Master-slave admin servers for fault tolerance
- **Task Partitioning**: Partition tasks by collection or datacenter
- **Batch Operations**: Group similar tasks for efficiency
## 8. Implementation Plan
### Phase 1: Core Infrastructure
1. Admin server basic framework
2. Worker registration and heartbeat
3. Simple task assignment
4. Basic progress tracking
### Phase 2: Advanced Features
1. Volume state reconciliation
2. Sophisticated worker selection
3. Failure detection and recovery
4. Duplicate prevention
### Phase 3: Optimization & Monitoring
1. Performance metrics
2. Load balancing algorithms
3. Capacity planning integration
4. Comprehensive monitoring
This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.
|