aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/registry.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-01 11:18:32 -0700
committerGitHub <noreply@github.com>2025-08-01 11:18:32 -0700
commit0975968e71b05368d5f28f788cf863c2042c2696 (patch)
tree5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/registry.go
parent1cba609bfa2306cc2885df212febd5ff954aa693 (diff)
downloadseaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz
seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip
admin: Refactor task destination planning (#7063)
* refactor planning into task detection * refactoring worker tasks * refactor * compiles, but only balance task is registered * compiles, but has nil exception * avoid nil logger * add back ec task * setting ec log directory * implement balance and vacuum tasks * EC tasks will no longer fail with "file not found" errors * Use ReceiveFile API to send locally generated shards * distributing shard files and ecx,ecj,vif files * generate .ecx files correctly * do not mount all possible EC shards (0-13) on every destination * use constants * delete all replicas * rename files * pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/registry.go')
-rw-r--r--weed/worker/tasks/registry.go64
1 files changed, 51 insertions, 13 deletions
diff --git a/weed/worker/tasks/registry.go b/weed/worker/tasks/registry.go
index 105055128..626a54a14 100644
--- a/weed/worker/tasks/registry.go
+++ b/weed/worker/tasks/registry.go
@@ -8,23 +8,14 @@ import (
)
var (
- globalRegistry *TaskRegistry
globalTypesRegistry *types.TaskRegistry
globalUIRegistry *types.UIRegistry
- registryOnce sync.Once
+ globalTaskRegistry *TaskRegistry
typesRegistryOnce sync.Once
uiRegistryOnce sync.Once
+ taskRegistryOnce sync.Once
)
-// GetGlobalRegistry returns the global task registry (singleton)
-func GetGlobalRegistry() *TaskRegistry {
- registryOnce.Do(func() {
- globalRegistry = NewTaskRegistry()
- glog.V(1).Infof("Created global task registry")
- })
- return globalRegistry
-}
-
// GetGlobalTypesRegistry returns the global types registry (singleton)
func GetGlobalTypesRegistry() *types.TaskRegistry {
typesRegistryOnce.Do(func() {
@@ -43,9 +34,18 @@ func GetGlobalUIRegistry() *types.UIRegistry {
return globalUIRegistry
}
-// AutoRegister registers a task directly with the global registry
+// GetGlobalTaskRegistry returns the global task registry (singleton)
+func GetGlobalTaskRegistry() *TaskRegistry {
+ taskRegistryOnce.Do(func() {
+ globalTaskRegistry = NewTaskRegistry()
+ glog.V(1).Infof("Created global task registry")
+ })
+ return globalTaskRegistry
+}
+
+// AutoRegister registers a task with the global task registry
func AutoRegister(taskType types.TaskType, factory types.TaskFactory) {
- registry := GetGlobalRegistry()
+ registry := GetGlobalTaskRegistry()
registry.Register(taskType, factory)
glog.V(1).Infof("Auto-registered task type: %s", taskType)
}
@@ -108,3 +108,41 @@ func SetMaintenancePolicyFromTasks() {
// For now, we'll just log that this should be called by the integration layer
glog.V(1).Infof("SetMaintenancePolicyFromTasks called - policy should be built by the integration layer")
}
+
+// TaskRegistry manages task factories
+type TaskRegistry struct {
+ factories map[types.TaskType]types.TaskFactory
+ mutex sync.RWMutex
+}
+
+// NewTaskRegistry creates a new task registry
+func NewTaskRegistry() *TaskRegistry {
+ return &TaskRegistry{
+ factories: make(map[types.TaskType]types.TaskFactory),
+ }
+}
+
+// Register adds a factory to the registry
+func (r *TaskRegistry) Register(taskType types.TaskType, factory types.TaskFactory) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ r.factories[taskType] = factory
+}
+
+// Get returns a factory from the registry
+func (r *TaskRegistry) Get(taskType types.TaskType) types.TaskFactory {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+ return r.factories[taskType]
+}
+
+// GetAll returns all registered factories
+func (r *TaskRegistry) GetAll() map[types.TaskType]types.TaskFactory {
+ r.mutex.RLock()
+ defer r.mutex.RUnlock()
+ result := make(map[types.TaskType]types.TaskFactory)
+ for k, v := range r.factories {
+ result[k] = v
+ }
+ return result
+}