diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/registry.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/worker/registry.go')
| -rw-r--r-- | weed/worker/registry.go | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/weed/worker/registry.go b/weed/worker/registry.go index e227beb6a..0b40ddec4 100644 --- a/weed/worker/registry.go +++ b/weed/worker/registry.go @@ -10,7 +10,7 @@ import ( // Registry manages workers and their statistics type Registry struct { - workers map[string]*types.Worker + workers map[string]*types.WorkerData stats *types.RegistryStats mutex sync.RWMutex } @@ -18,7 +18,7 @@ type Registry struct { // NewRegistry creates a new worker registry func NewRegistry() *Registry { return &Registry{ - workers: make(map[string]*types.Worker), + workers: make(map[string]*types.WorkerData), stats: &types.RegistryStats{ TotalWorkers: 0, ActiveWorkers: 0, @@ -33,7 +33,7 @@ func NewRegistry() *Registry { } // RegisterWorker registers a new worker -func (r *Registry) RegisterWorker(worker *types.Worker) error { +func (r *Registry) RegisterWorker(worker *types.WorkerData) error { r.mutex.Lock() defer r.mutex.Unlock() @@ -61,7 +61,7 @@ func (r *Registry) UnregisterWorker(workerID string) error { } // GetWorker returns a worker by ID -func (r *Registry) GetWorker(workerID string) (*types.Worker, bool) { +func (r *Registry) GetWorker(workerID string) (*types.WorkerData, bool) { r.mutex.RLock() defer r.mutex.RUnlock() @@ -70,11 +70,11 @@ func (r *Registry) GetWorker(workerID string) (*types.Worker, bool) { } // ListWorkers returns all registered workers -func (r *Registry) ListWorkers() []*types.Worker { +func (r *Registry) ListWorkers() []*types.WorkerData { r.mutex.RLock() defer r.mutex.RUnlock() - workers := make([]*types.Worker, 0, len(r.workers)) + workers := make([]*types.WorkerData, 0, len(r.workers)) for _, worker := range r.workers { workers = append(workers, worker) } @@ -82,11 +82,11 @@ func (r *Registry) ListWorkers() []*types.Worker { } // GetWorkersByCapability returns workers that support a specific capability -func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.Worker { +func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.WorkerData { r.mutex.RLock() defer r.mutex.RUnlock() - var workers []*types.Worker + var workers []*types.WorkerData for _, worker := range r.workers { for _, cap := range worker.Capabilities { if cap == capability { @@ -99,11 +99,11 @@ func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.Wo } // GetAvailableWorkers returns workers that are available for new tasks -func (r *Registry) GetAvailableWorkers() []*types.Worker { +func (r *Registry) GetAvailableWorkers() []*types.WorkerData { r.mutex.RLock() defer r.mutex.RUnlock() - var workers []*types.Worker + var workers []*types.WorkerData for _, worker := range r.workers { if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent { workers = append(workers, worker) @@ -113,11 +113,11 @@ func (r *Registry) GetAvailableWorkers() []*types.Worker { } // GetBestWorkerForTask returns the best worker for a specific task -func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.Worker { +func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.WorkerData { r.mutex.RLock() defer r.mutex.RUnlock() - var bestWorker *types.Worker + var bestWorker *types.WorkerData var bestScore float64 for _, worker := range r.workers { @@ -277,11 +277,11 @@ func (r *Registry) GetTaskCapabilities() []types.TaskType { } // GetWorkersByStatus returns workers filtered by status -func (r *Registry) GetWorkersByStatus(status string) []*types.Worker { +func (r *Registry) GetWorkersByStatus(status string) []*types.WorkerData { r.mutex.RLock() defer r.mutex.RUnlock() - var workers []*types.Worker + var workers []*types.WorkerData for _, worker := range r.workers { if worker.Status == status { workers = append(workers, worker) |
