aboutsummaryrefslogtreecommitdiff
path: root/DESIGN.md
blob: d164467c3c31e83daf52e9c8afac1c10b8f009a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# SeaweedFS Task Distribution System Design

## Overview

This document describes the design of a distributed task management system for SeaweedFS that handles Erasure Coding (EC) and vacuum operations through a scalable admin server and worker process architecture.

## System Architecture

### High-Level Components

```
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Master        │◄──►│  Admin Server    │◄──►│   Workers       │
│                 │    │                  │    │                 │
│ - Volume Info   │    │ - Task Discovery │    │ - Task Exec     │
│ - Shard Status  │    │ - Task Assign    │    │ - Progress      │
│ - Heartbeats    │    │ - Progress Track │    │ - Error Report  │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                        │                        │
         │                        │                        │
         ▼                        ▼                        ▼
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ Volume Servers  │    │ Volume Monitor   │    │ Task Execution  │
│                 │    │                  │    │                 │
│ - Store Volumes │    │ - Health Check   │    │ - EC Convert    │
│ - EC Shards     │    │ - Usage Stats    │    │ - Vacuum Clean  │
│ - Report Status │    │ - State Sync     │    │ - Status Report │
└─────────────────┘    └──────────────────┘    └─────────────────┘
```

## 1. Admin Server Design

### 1.1 Core Responsibilities

- **Task Discovery**: Scan volumes to identify EC and vacuum candidates
- **Worker Management**: Track available workers and their capabilities  
- **Task Assignment**: Match tasks to optimal workers
- **Progress Tracking**: Monitor in-progress tasks for capacity planning
- **State Reconciliation**: Sync with master server for volume state updates

### 1.2 Task Discovery Engine

```go
type TaskDiscoveryEngine struct {
    masterClient   MasterClient
    volumeScanner  VolumeScanner
    taskDetectors  map[TaskType]TaskDetector
    scanInterval   time.Duration
}

type VolumeCandidate struct {
    VolumeID       uint32
    Server         string
    Collection     string
    TaskType       TaskType
    Priority       TaskPriority
    Reason         string
    DetectedAt     time.Time
    Parameters     map[string]interface{}
}
```

**EC Detection Logic**:
- Find volumes >= 95% full and idle for > 1 hour
- Exclude volumes already in EC format
- Exclude volumes with ongoing operations
- Prioritize by collection and age

**Vacuum Detection Logic**:
- Find volumes with garbage ratio > 30%
- Exclude read-only volumes
- Exclude volumes with recent vacuum operations
- Prioritize by garbage percentage

### 1.3 Worker Registry & Management

```go
type WorkerRegistry struct {
    workers        map[string]*Worker
    capabilities   map[TaskType][]*Worker
    lastHeartbeat  map[string]time.Time
    taskAssignment map[string]*Task
    mutex          sync.RWMutex
}

type Worker struct {
    ID            string
    Address       string
    Capabilities  []TaskType
    MaxConcurrent int
    CurrentLoad   int
    Status        WorkerStatus
    LastSeen      time.Time
    Performance   WorkerMetrics
}
```

### 1.4 Task Assignment Algorithm

```go
type TaskScheduler struct {
    registry       *WorkerRegistry
    taskQueue      *PriorityQueue
    inProgressTasks map[string]*InProgressTask
    volumeReservations map[uint32]*VolumeReservation
}

// Worker Selection Criteria:
// 1. Has required capability (EC or Vacuum)
// 2. Available capacity (CurrentLoad < MaxConcurrent)
// 3. Best performance history for task type
// 4. Lowest current load
// 5. Geographically close to volume server (optional)
```

## 2. Worker Process Design

### 2.1 Worker Architecture

```go
type MaintenanceWorker struct {
    id              string
    config          *WorkerConfig
    adminClient     AdminClient
    taskExecutors   map[TaskType]TaskExecutor
    currentTasks    map[string]*RunningTask
    registry        *TaskRegistry
    heartbeatTicker *time.Ticker
    requestTicker   *time.Ticker
}
```

### 2.2 Task Execution Framework

```go
type TaskExecutor interface {
    Execute(ctx context.Context, task *Task) error
    EstimateTime(task *Task) time.Duration
    ValidateResources(task *Task) error
    GetProgress() float64
    Cancel() error
}

type ErasureCodingExecutor struct {
    volumeClient VolumeServerClient
    progress     float64
    cancelled    bool
}

type VacuumExecutor struct {
    volumeClient VolumeServerClient
    progress     float64
    cancelled    bool
}
```

### 2.3 Worker Capabilities & Registration

```go
type WorkerCapabilities struct {
    SupportedTasks   []TaskType
    MaxConcurrent    int
    ResourceLimits   ResourceLimits
    PreferredServers []string  // Affinity for specific volume servers
}

type ResourceLimits struct {
    MaxMemoryMB      int64
    MaxDiskSpaceMB   int64
    MaxNetworkMbps   int64
    MaxCPUPercent    float64
}
```

## 3. Task Lifecycle Management

### 3.1 Task States

```go
type TaskState string

const (
    TaskStatePending     TaskState = "pending"
    TaskStateAssigned    TaskState = "assigned"
    TaskStateInProgress  TaskState = "in_progress"
    TaskStateCompleted   TaskState = "completed"
    TaskStateFailed      TaskState = "failed"
    TaskStateCancelled   TaskState = "cancelled"
    TaskStateStuck       TaskState = "stuck"       // Taking too long
    TaskStateDuplicate   TaskState = "duplicate"   // Detected duplicate
)
```

### 3.2 Progress Tracking & Monitoring

```go
type InProgressTask struct {
    Task           *Task
    WorkerID       string
    StartedAt      time.Time
    LastUpdate     time.Time
    Progress       float64
    EstimatedEnd   time.Time
    VolumeReserved bool  // Reserved for capacity planning
}

type TaskMonitor struct {
    inProgressTasks map[string]*InProgressTask
    timeoutChecker  *time.Ticker
    stuckDetector   *time.Ticker
    duplicateChecker *time.Ticker
}
```

## 4. Volume Capacity Reconciliation

### 4.1 Volume State Tracking

```go
type VolumeStateManager struct {
    masterClient      MasterClient
    inProgressTasks   map[uint32]*InProgressTask  // VolumeID -> Task
    committedChanges  map[uint32]*VolumeChange    // Changes not yet in master
    reconcileInterval time.Duration
}

type VolumeChange struct {
    VolumeID     uint32
    ChangeType   ChangeType  // "ec_encoding", "vacuum_completed"
    OldCapacity  int64
    NewCapacity  int64
    TaskID       string
    CompletedAt  time.Time
    ReportedToMaster bool
}
```

### 4.2 Shard Assignment Integration

When the master needs to assign shards, it must consider:
1. **Current volume state** from its own records
2. **In-progress capacity changes** from admin server
3. **Committed but unreported changes** from admin server

```go
type CapacityOracle struct {
    adminServer   AdminServerClient
    masterState   *MasterVolumeState
    updateFreq    time.Duration
}

func (o *CapacityOracle) GetAdjustedCapacity(volumeID uint32) int64 {
    baseCapacity := o.masterState.GetCapacity(volumeID)
    
    // Adjust for in-progress tasks
    if task := o.adminServer.GetInProgressTask(volumeID); task != nil {
        switch task.Type {
        case TaskTypeErasureCoding:
            // EC reduces effective capacity
            return baseCapacity / 2  // Simplified
        case TaskTypeVacuum:
            // Vacuum may increase available space
            return baseCapacity + int64(float64(baseCapacity) * 0.3)
        }
    }
    
    // Adjust for completed but unreported changes
    if change := o.adminServer.GetPendingChange(volumeID); change != nil {
        return change.NewCapacity
    }
    
    return baseCapacity
}
```

## 5. Error Handling & Recovery

### 5.1 Worker Failure Scenarios

```go
type FailureHandler struct {
    taskRescheduler *TaskRescheduler
    workerMonitor   *WorkerMonitor
    alertManager    *AlertManager
}

// Failure Scenarios:
// 1. Worker becomes unresponsive (heartbeat timeout)
// 2. Task execution fails (reported by worker)
// 3. Task gets stuck (progress timeout)
// 4. Duplicate task detection
// 5. Resource exhaustion
```

### 5.2 Recovery Strategies

**Worker Timeout Recovery**:
- Mark worker as inactive after 3 missed heartbeats
- Reschedule all assigned tasks to other workers
- Cleanup any partial state

**Task Stuck Recovery**:
- Detect tasks with no progress for > 2x estimated time
- Cancel stuck task and mark volume for cleanup
- Reschedule if retry count < max_retries

**Duplicate Task Prevention**:
```go
type DuplicateDetector struct {
    activeFingerprints map[string]bool  // VolumeID+TaskType
    recentCompleted    *LRUCache        // Recently completed tasks
}

func (d *DuplicateDetector) IsTaskDuplicate(task *Task) bool {
    fingerprint := fmt.Sprintf("%d-%s", task.VolumeID, task.Type)
    return d.activeFingerprints[fingerprint] || 
           d.recentCompleted.Contains(fingerprint)
}
```

## 6. Simulation & Testing Framework

### 6.1 Failure Simulation

```go
type TaskSimulator struct {
    scenarios map[string]SimulationScenario
}

type SimulationScenario struct {
    Name            string
    WorkerCount     int
    VolumeCount     int
    FailurePatterns []FailurePattern
    Duration        time.Duration
}

type FailurePattern struct {
    Type        FailureType  // "worker_timeout", "task_stuck", "duplicate"
    Probability float64      // 0.0 to 1.0
    Timing      TimingSpec   // When during task execution
    Duration    time.Duration
}
```

### 6.2 Test Scenarios

**Scenario 1: Worker Timeout During EC**
- Start EC task on 30GB volume
- Kill worker at 50% progress
- Verify task reassignment
- Verify no duplicate EC operations

**Scenario 2: Stuck Vacuum Task**
- Start vacuum on high-garbage volume
- Simulate worker hanging at 75% progress
- Verify timeout detection and cleanup
- Verify volume state consistency

**Scenario 3: Duplicate Task Prevention**
- Submit same EC task from multiple sources
- Verify only one task executes
- Verify proper conflict resolution

**Scenario 4: Master-Admin State Divergence**
- Create in-progress EC task
- Simulate master restart
- Verify state reconciliation
- Verify shard assignment accounts for in-progress work

## 7. Performance & Scalability

### 7.1 Metrics & Monitoring

```go
type SystemMetrics struct {
    TasksPerSecond     float64
    WorkerUtilization  float64
    AverageTaskTime    time.Duration
    FailureRate        float64
    QueueDepth         int
    VolumeStatesSync   bool
}
```

### 7.2 Scalability Considerations

- **Horizontal Worker Scaling**: Add workers without admin server changes
- **Admin Server HA**: Master-slave admin servers for fault tolerance
- **Task Partitioning**: Partition tasks by collection or datacenter
- **Batch Operations**: Group similar tasks for efficiency

## 8. Implementation Plan

### Phase 1: Core Infrastructure
1. Admin server basic framework
2. Worker registration and heartbeat
3. Simple task assignment
4. Basic progress tracking

### Phase 2: Advanced Features
1. Volume state reconciliation
2. Sophisticated worker selection
3. Failure detection and recovery
4. Duplicate prevention

### Phase 3: Optimization & Monitoring
1. Performance metrics
2. Load balancing algorithms
3. Capacity planning integration
4. Comprehensive monitoring

This design provides a robust, scalable foundation for distributed task management in SeaweedFS while maintaining consistency with the existing architecture patterns.