aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/registry.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/registry.go')
-rw-r--r--weed/worker/registry.go348
1 files changed, 348 insertions, 0 deletions
diff --git a/weed/worker/registry.go b/weed/worker/registry.go
new file mode 100644
index 000000000..e227beb6a
--- /dev/null
+++ b/weed/worker/registry.go
@@ -0,0 +1,348 @@
+package worker
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Registry manages workers and their statistics
+type Registry struct {
+ workers map[string]*types.Worker
+ stats *types.RegistryStats
+ mutex sync.RWMutex
+}
+
+// NewRegistry creates a new worker registry
+func NewRegistry() *Registry {
+ return &Registry{
+ workers: make(map[string]*types.Worker),
+ stats: &types.RegistryStats{
+ TotalWorkers: 0,
+ ActiveWorkers: 0,
+ BusyWorkers: 0,
+ IdleWorkers: 0,
+ TotalTasks: 0,
+ CompletedTasks: 0,
+ FailedTasks: 0,
+ StartTime: time.Now(),
+ },
+ }
+}
+
+// RegisterWorker registers a new worker
+func (r *Registry) RegisterWorker(worker *types.Worker) error {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ if _, exists := r.workers[worker.ID]; exists {
+ return fmt.Errorf("worker %s already registered", worker.ID)
+ }
+
+ r.workers[worker.ID] = worker
+ r.updateStats()
+ return nil
+}
+
+// UnregisterWorker removes a worker from the registry
+func (r *Registry) UnregisterWorker(workerID string) error {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ if _, exists := r.workers[workerID]; !exists {
+ return fmt.Errorf("worker %s not found", workerID)
+ }
+
+ delete(r.workers, workerID)
+ r.updateStats()
+ return nil
+}
+
+// GetWorker returns a worker by ID
+func (r *Registry) GetWorker(workerID string) (*types.Worker, bool) {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ worker, exists := r.workers[workerID]
+ return worker, exists
+}
+
+// ListWorkers returns all registered workers
+func (r *Registry) ListWorkers() []*types.Worker {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ workers := make([]*types.Worker, 0, len(r.workers))
+ for _, worker := range r.workers {
+ workers = append(workers, worker)
+ }
+ return workers
+}
+
+// GetWorkersByCapability returns workers that support a specific capability
+func (r *Registry) GetWorkersByCapability(capability types.TaskType) []*types.Worker {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ var workers []*types.Worker
+ for _, worker := range r.workers {
+ for _, cap := range worker.Capabilities {
+ if cap == capability {
+ workers = append(workers, worker)
+ break
+ }
+ }
+ }
+ return workers
+}
+
+// GetAvailableWorkers returns workers that are available for new tasks
+func (r *Registry) GetAvailableWorkers() []*types.Worker {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ var workers []*types.Worker
+ for _, worker := range r.workers {
+ if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
+ workers = append(workers, worker)
+ }
+ }
+ return workers
+}
+
+// GetBestWorkerForTask returns the best worker for a specific task
+func (r *Registry) GetBestWorkerForTask(taskType types.TaskType) *types.Worker {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ var bestWorker *types.Worker
+ var bestScore float64
+
+ for _, worker := range r.workers {
+ // Check if worker supports this task type
+ supportsTask := false
+ for _, cap := range worker.Capabilities {
+ if cap == taskType {
+ supportsTask = true
+ break
+ }
+ }
+
+ if !supportsTask {
+ continue
+ }
+
+ // Check if worker is available
+ if worker.Status != "active" || worker.CurrentLoad >= worker.MaxConcurrent {
+ continue
+ }
+
+ // Calculate score based on current load and capacity
+ score := float64(worker.MaxConcurrent-worker.CurrentLoad) / float64(worker.MaxConcurrent)
+ if bestWorker == nil || score > bestScore {
+ bestWorker = worker
+ bestScore = score
+ }
+ }
+
+ return bestWorker
+}
+
+// UpdateWorkerHeartbeat updates the last heartbeat time for a worker
+func (r *Registry) UpdateWorkerHeartbeat(workerID string) error {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ worker, exists := r.workers[workerID]
+ if !exists {
+ return fmt.Errorf("worker %s not found", workerID)
+ }
+
+ worker.LastHeartbeat = time.Now()
+ return nil
+}
+
+// UpdateWorkerLoad updates the current load for a worker
+func (r *Registry) UpdateWorkerLoad(workerID string, load int) error {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ worker, exists := r.workers[workerID]
+ if !exists {
+ return fmt.Errorf("worker %s not found", workerID)
+ }
+
+ worker.CurrentLoad = load
+ if load >= worker.MaxConcurrent {
+ worker.Status = "busy"
+ } else {
+ worker.Status = "active"
+ }
+
+ r.updateStats()
+ return nil
+}
+
+// UpdateWorkerStatus updates the status of a worker
+func (r *Registry) UpdateWorkerStatus(workerID string, status string) error {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ worker, exists := r.workers[workerID]
+ if !exists {
+ return fmt.Errorf("worker %s not found", workerID)
+ }
+
+ worker.Status = status
+ r.updateStats()
+ return nil
+}
+
+// CleanupStaleWorkers removes workers that haven't sent heartbeats recently
+func (r *Registry) CleanupStaleWorkers(timeout time.Duration) int {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ var removedCount int
+ cutoff := time.Now().Add(-timeout)
+
+ for workerID, worker := range r.workers {
+ if worker.LastHeartbeat.Before(cutoff) {
+ delete(r.workers, workerID)
+ removedCount++
+ }
+ }
+
+ if removedCount > 0 {
+ r.updateStats()
+ }
+
+ return removedCount
+}
+
+// GetStats returns current registry statistics
+func (r *Registry) GetStats() *types.RegistryStats {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ // Create a copy of the stats to avoid race conditions
+ stats := *r.stats
+ return &stats
+}
+
+// updateStats updates the registry statistics (must be called with lock held)
+func (r *Registry) updateStats() {
+ r.stats.TotalWorkers = len(r.workers)
+ r.stats.ActiveWorkers = 0
+ r.stats.BusyWorkers = 0
+ r.stats.IdleWorkers = 0
+
+ for _, worker := range r.workers {
+ switch worker.Status {
+ case "active":
+ if worker.CurrentLoad > 0 {
+ r.stats.ActiveWorkers++
+ } else {
+ r.stats.IdleWorkers++
+ }
+ case "busy":
+ r.stats.BusyWorkers++
+ }
+ }
+
+ r.stats.Uptime = time.Since(r.stats.StartTime)
+ r.stats.LastUpdated = time.Now()
+}
+
+// GetTaskCapabilities returns all task capabilities available in the registry
+func (r *Registry) GetTaskCapabilities() []types.TaskType {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ capabilitySet := make(map[types.TaskType]bool)
+ for _, worker := range r.workers {
+ for _, cap := range worker.Capabilities {
+ capabilitySet[cap] = true
+ }
+ }
+
+ var capabilities []types.TaskType
+ for cap := range capabilitySet {
+ capabilities = append(capabilities, cap)
+ }
+
+ return capabilities
+}
+
+// GetWorkersByStatus returns workers filtered by status
+func (r *Registry) GetWorkersByStatus(status string) []*types.Worker {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ var workers []*types.Worker
+ for _, worker := range r.workers {
+ if worker.Status == status {
+ workers = append(workers, worker)
+ }
+ }
+ return workers
+}
+
+// GetWorkerCount returns the total number of registered workers
+func (r *Registry) GetWorkerCount() int {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+ return len(r.workers)
+}
+
+// GetWorkerIDs returns all worker IDs
+func (r *Registry) GetWorkerIDs() []string {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ ids := make([]string, 0, len(r.workers))
+ for id := range r.workers {
+ ids = append(ids, id)
+ }
+ return ids
+}
+
+// GetWorkerSummary returns a summary of all workers
+func (r *Registry) GetWorkerSummary() *types.WorkerSummary {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+
+ summary := &types.WorkerSummary{
+ TotalWorkers: len(r.workers),
+ ByStatus: make(map[string]int),
+ ByCapability: make(map[types.TaskType]int),
+ TotalLoad: 0,
+ MaxCapacity: 0,
+ }
+
+ for _, worker := range r.workers {
+ summary.ByStatus[worker.Status]++
+ summary.TotalLoad += worker.CurrentLoad
+ summary.MaxCapacity += worker.MaxConcurrent
+
+ for _, cap := range worker.Capabilities {
+ summary.ByCapability[cap]++
+ }
+ }
+
+ return summary
+}
+
+// Default global registry instance
+var defaultRegistry *Registry
+var registryOnce sync.Once
+
+// GetDefaultRegistry returns the default global registry
+func GetDefaultRegistry() *Registry {
+ registryOnce.Do(func() {
+ defaultRegistry = NewRegistry()
+ })
+ return defaultRegistry
+}