aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/worker.proto
blob: b9e3d61d0fe3c66fe516255559aee4303381a08b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
syntax = "proto3";

package worker_pb;

option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb";

// WorkerService provides bidirectional communication between admin and worker
service WorkerService {
  // WorkerStream maintains a bidirectional stream for worker communication
  rpc WorkerStream(stream WorkerMessage) returns (stream AdminMessage);
}

// WorkerMessage represents messages from worker to admin
message WorkerMessage {
  string worker_id = 1;
  int64 timestamp = 2;
  
  oneof message {
    WorkerRegistration registration = 3;
    WorkerHeartbeat heartbeat = 4;
    TaskRequest task_request = 5;
    TaskUpdate task_update = 6;
    TaskComplete task_complete = 7;
    WorkerShutdown shutdown = 8;
    TaskLogResponse task_log_response = 9;
  }
}

// AdminMessage represents messages from admin to worker
message AdminMessage {
  string admin_id = 1;
  int64 timestamp = 2;
  
  oneof message {
    RegistrationResponse registration_response = 3;
    HeartbeatResponse heartbeat_response = 4;
    TaskAssignment task_assignment = 5;
    TaskCancellation task_cancellation = 6;
    AdminShutdown admin_shutdown = 7;
    TaskLogRequest task_log_request = 8;
  }
}

// WorkerRegistration message when worker connects
message WorkerRegistration {
  string worker_id = 1;
  string address = 2;
  repeated string capabilities = 3;
  int32 max_concurrent = 4;
  map<string, string> metadata = 5;
}

// RegistrationResponse confirms worker registration
message RegistrationResponse {
  bool success = 1;
  string message = 2;
  string assigned_worker_id = 3;
}

// WorkerHeartbeat sent periodically by worker
message WorkerHeartbeat {
  string worker_id = 1;
  string status = 2;
  int32 current_load = 3;
  int32 max_concurrent = 4;
  repeated string current_task_ids = 5;
  int32 tasks_completed = 6;
  int32 tasks_failed = 7;
  int64 uptime_seconds = 8;
}

// HeartbeatResponse acknowledges heartbeat
message HeartbeatResponse {
  bool success = 1;
  string message = 2;
}

// TaskRequest from worker asking for new tasks
message TaskRequest {
  string worker_id = 1;
  repeated string capabilities = 2;
  int32 available_slots = 3;
}

// TaskAssignment from admin to worker
message TaskAssignment {
  string task_id = 1;
  string task_type = 2;
  TaskParams params = 3;
  int32 priority = 4;
  int64 created_time = 5;
  map<string, string> metadata = 6;
}

// TaskParams contains task-specific parameters with typed variants
message TaskParams {
  string task_id = 1;                   // ActiveTopology task ID for lifecycle management
  uint32 volume_id = 2;                 // Primary volume ID for the task
  string collection = 3;                // Collection name
  string data_center = 4;               // Primary data center
  string rack = 5;                      // Primary rack
  uint64 volume_size = 6;               // Original volume size in bytes for tracking size changes
  
  // Unified source and target arrays for all task types
  repeated TaskSource sources = 7;      // Source locations (volume replicas, EC shards, etc.)
  repeated TaskTarget targets = 8;      // Target locations (destinations, new replicas, etc.)
  
  // Typed task parameters
  oneof task_params {
    VacuumTaskParams vacuum_params = 9;
    ErasureCodingTaskParams erasure_coding_params = 10;
    BalanceTaskParams balance_params = 11;
    ReplicationTaskParams replication_params = 12;
  }
}

// VacuumTaskParams for vacuum operations
message VacuumTaskParams {
  double garbage_threshold = 1;           // Minimum garbage ratio to trigger vacuum
  bool force_vacuum = 2;                  // Force vacuum even if below threshold
  int32 batch_size = 3;                   // Number of files to process per batch
  string working_dir = 4;                 // Working directory for temporary files
  bool verify_checksum = 5;               // Verify file checksums during vacuum
}

// ErasureCodingTaskParams for EC encoding operations
message ErasureCodingTaskParams {
  uint64 estimated_shard_size = 1;        // Estimated size per shard
  int32 data_shards = 2;                  // Number of data shards (default: 10)
  int32 parity_shards = 3;                // Number of parity shards (default: 4)
  string working_dir = 4;                 // Working directory for EC processing
  string master_client = 5;               // Master server address
  bool cleanup_source = 6;                // Whether to cleanup source volume after EC
}

// TaskSource represents a unified source location for any task type
message TaskSource {
  string node = 1;                        // Source server address
  uint32 disk_id = 2;                     // Source disk ID
  string rack = 3;                        // Source rack for tracking
  string data_center = 4;                 // Source data center for tracking
  uint32 volume_id = 5;                   // Volume ID (for volume operations)
  repeated uint32 shard_ids = 6;          // Shard IDs (for EC shard operations)
  uint64 estimated_size = 7;              // Estimated size to be processed
}

// TaskTarget represents a unified target location for any task type
message TaskTarget {
  string node = 1;                        // Target server address
  uint32 disk_id = 2;                     // Target disk ID  
  string rack = 3;                        // Target rack for tracking
  string data_center = 4;                 // Target data center for tracking
  uint32 volume_id = 5;                   // Volume ID (for volume operations)
  repeated uint32 shard_ids = 6;          // Shard IDs (for EC shard operations)
  uint64 estimated_size = 7;              // Estimated size to be created
}



// BalanceTaskParams for volume balancing operations
message BalanceTaskParams {
  bool force_move = 1;                    // Force move even with conflicts
  int32 timeout_seconds = 2;              // Operation timeout
}

// ReplicationTaskParams for adding replicas
message ReplicationTaskParams {
  int32 replica_count = 1;                // Target replica count
  bool verify_consistency = 2;            // Verify replica consistency after creation
}

// TaskUpdate reports task progress
message TaskUpdate {
  string task_id = 1;
  string worker_id = 2;
  string status = 3;
  float progress = 4;
  string message = 5;
  map<string, string> metadata = 6;
}

// TaskComplete reports task completion
message TaskComplete {
  string task_id = 1;
  string worker_id = 2;
  bool success = 3;
  string error_message = 4;
  int64 completion_time = 5;
  map<string, string> result_metadata = 6;
}

// TaskCancellation from admin to cancel a task
message TaskCancellation {
  string task_id = 1;
  string reason = 2;
  bool force = 3;
}

// WorkerShutdown notifies admin that worker is shutting down
message WorkerShutdown {
  string worker_id = 1;
  string reason = 2;
  repeated string pending_task_ids = 3;
}

// AdminShutdown notifies worker that admin is shutting down
message AdminShutdown {
  string reason = 1;
  int32 graceful_shutdown_seconds = 2;
}

// ========== Task Log Messages ==========

// TaskLogRequest requests logs for a specific task
message TaskLogRequest {
  string task_id = 1;
  string worker_id = 2;
  bool include_metadata = 3;  // Include task metadata
  int32 max_entries = 4;      // Maximum number of log entries (0 = all)
  string log_level = 5;       // Filter by log level (INFO, WARNING, ERROR, DEBUG)
  int64 start_time = 6;       // Unix timestamp for start time filter
  int64 end_time = 7;         // Unix timestamp for end time filter
}

// TaskLogResponse returns task logs and metadata
message TaskLogResponse {
  string task_id = 1;
  string worker_id = 2;
  bool success = 3;
  string error_message = 4;
  TaskLogMetadata metadata = 5;
  repeated TaskLogEntry log_entries = 6;
}

// TaskLogMetadata contains metadata about task execution
message TaskLogMetadata {
  string task_id = 1;
  string task_type = 2;
  string worker_id = 3;
  int64 start_time = 4;
  int64 end_time = 5;
  int64 duration_ms = 6;
  string status = 7;
  float progress = 8;
  uint32 volume_id = 9;
  string server = 10;
  string collection = 11;
  string log_file_path = 12;
  int64 created_at = 13;
  map<string, string> custom_data = 14;
}

// TaskLogEntry represents a single log entry
message TaskLogEntry {
  int64 timestamp = 1;
  string level = 2;
  string message = 3;
  map<string, string> fields = 4;
  float progress = 5;
  string status = 6;
}

// ========== Maintenance Configuration Messages ==========

// MaintenanceConfig holds configuration for the maintenance system
message MaintenanceConfig {
  bool enabled = 1;
  int32 scan_interval_seconds = 2;    // How often to scan for maintenance needs
  int32 worker_timeout_seconds = 3;   // Worker heartbeat timeout
  int32 task_timeout_seconds = 4;     // Individual task timeout
  int32 retry_delay_seconds = 5;      // Delay between retries
  int32 max_retries = 6;              // Default max retries for tasks
  int32 cleanup_interval_seconds = 7; // How often to clean up old tasks
  int32 task_retention_seconds = 8;   // How long to keep completed/failed tasks
  MaintenancePolicy policy = 9;
}

// MaintenancePolicy defines policies for maintenance operations
message MaintenancePolicy {
  map<string, TaskPolicy> task_policies = 1;  // Task type -> policy mapping
  int32 global_max_concurrent = 2;             // Overall limit across all task types
  int32 default_repeat_interval_seconds = 3;   // Default seconds if task doesn't specify
  int32 default_check_interval_seconds = 4;    // Default seconds for periodic checks
}

// TaskPolicy represents configuration for a specific task type
message TaskPolicy {
  bool enabled = 1;
  int32 max_concurrent = 2;
  int32 repeat_interval_seconds = 3;  // Seconds to wait before repeating
  int32 check_interval_seconds = 4;   // Seconds between checks
  
  // Typed task-specific configuration (replaces generic map)
  oneof task_config {
    VacuumTaskConfig vacuum_config = 5;
    ErasureCodingTaskConfig erasure_coding_config = 6;
    BalanceTaskConfig balance_config = 7;
    ReplicationTaskConfig replication_config = 8;
  }
}

// Task-specific configuration messages

// VacuumTaskConfig contains vacuum-specific configuration
message VacuumTaskConfig {
  double garbage_threshold = 1;     // Minimum garbage ratio to trigger vacuum (0.0-1.0)
  int32 min_volume_age_hours = 2;   // Minimum age before vacuum is considered
  int32 min_interval_seconds = 3;   // Minimum time between vacuum operations on the same volume
}

// ErasureCodingTaskConfig contains EC-specific configuration
message ErasureCodingTaskConfig {
  double fullness_ratio = 1;        // Minimum fullness ratio to trigger EC (0.0-1.0)
  int32 quiet_for_seconds = 2;      // Minimum quiet time before EC
  int32 min_volume_size_mb = 3;     // Minimum volume size for EC
  string collection_filter = 4;     // Only process volumes from specific collections
}

// BalanceTaskConfig contains balance-specific configuration
message BalanceTaskConfig {
  double imbalance_threshold = 1;   // Threshold for triggering rebalancing (0.0-1.0)
  int32 min_server_count = 2;       // Minimum number of servers required for balancing
}

// ReplicationTaskConfig contains replication-specific configuration
message ReplicationTaskConfig {
  int32 target_replica_count = 1;   // Target number of replicas
}

// ========== Task Persistence Messages ==========

// MaintenanceTaskData represents complete task state for persistence
message MaintenanceTaskData {
  string id = 1;
  string type = 2;
  string priority = 3;
  string status = 4;
  uint32 volume_id = 5;
  string server = 6;
  string collection = 7;
  TaskParams typed_params = 8;
  string reason = 9;
  int64 created_at = 10;
  int64 scheduled_at = 11;
  int64 started_at = 12;
  int64 completed_at = 13;
  string worker_id = 14;
  string error = 15;
  double progress = 16;
  int32 retry_count = 17;
  int32 max_retries = 18;

  // Enhanced fields for detailed task tracking
  string created_by = 19;
  string creation_context = 20;
  repeated TaskAssignmentRecord assignment_history = 21;
  string detailed_reason = 22;
  map<string, string> tags = 23;
  TaskCreationMetrics creation_metrics = 24;
}

// TaskAssignmentRecord tracks worker assignments for a task
message TaskAssignmentRecord {
  string worker_id = 1;
  string worker_address = 2;
  int64 assigned_at = 3;
  int64 unassigned_at = 4;  // Optional: when worker was unassigned
  string reason = 5;        // Reason for assignment/unassignment
}

// TaskCreationMetrics tracks why and how a task was created
message TaskCreationMetrics {
  string trigger_metric = 1;           // Name of metric that triggered creation
  double metric_value = 2;             // Value that triggered creation
  double threshold = 3;                // Threshold that was exceeded
  VolumeHealthMetrics volume_metrics = 4; // Volume health at creation time
  map<string, string> additional_data = 5; // Additional context data
}

// VolumeHealthMetrics captures volume state at task creation
message VolumeHealthMetrics {
  uint64 total_size = 1;
  uint64 used_size = 2;
  uint64 garbage_size = 3;
  double garbage_ratio = 4;
  int32 file_count = 5;
  int32 deleted_file_count = 6;
  int64 last_modified = 7;
  int32 replica_count = 8;
  bool is_ec_volume = 9;
  string collection = 10;
}

// TaskStateFile wraps task data with metadata for persistence
message TaskStateFile {
  MaintenanceTaskData task = 1;
  int64 last_updated = 2;
  string admin_version = 3;
}