diff options
Diffstat (limited to 'weed/worker/tasks/balance/balance_register.go')
| -rw-r--r-- | weed/worker/tasks/balance/balance_register.go | 109 |
1 files changed, 50 insertions, 59 deletions
diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go index 7c2d5a520..b26a40782 100644 --- a/weed/worker/tasks/balance/balance_register.go +++ b/weed/worker/tasks/balance/balance_register.go @@ -2,80 +2,71 @@ package balance import ( "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Factory creates balance task instances -type Factory struct { - *tasks.BaseTaskFactory -} +// Global variable to hold the task definition for configuration updates +var globalTaskDef *base.TaskDefinition -// NewFactory creates a new balance task factory -func NewFactory() *Factory { - return &Factory{ - BaseTaskFactory: tasks.NewBaseTaskFactory( - types.TaskTypeBalance, - []string{"balance", "storage", "optimization"}, - "Balance data across volume servers for optimal performance", - ), - } -} - -// Create creates a new balance task instance -func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) { - // Validate parameters - if params.VolumeID == 0 { - return nil, fmt.Errorf("volume_id is required") - } - if params.Server == "" { - return nil, fmt.Errorf("server is required") - } - - task := NewTask(params.Server, params.VolumeID, params.Collection) - task.SetEstimatedDuration(task.EstimateTime(params)) +// Auto-register this task when the package is imported +func init() { + RegisterBalanceTask() - return task, nil + // Register config updater + tasks.AutoRegisterConfigUpdater(types.TaskTypeBalance, UpdateConfigFromPersistence) } -// Shared detector and scheduler instances -var ( - sharedDetector *BalanceDetector - sharedScheduler *BalanceScheduler -) +// RegisterBalanceTask registers the balance task with the new architecture +func RegisterBalanceTask() { + // Create configuration instance + config := NewDefaultConfig() -// getSharedInstances returns the shared detector and scheduler instances -func getSharedInstances() (*BalanceDetector, *BalanceScheduler) { - if sharedDetector == nil { - sharedDetector = NewBalanceDetector() - } - if sharedScheduler == nil { - sharedScheduler = NewBalanceScheduler() + // Create complete task definition + taskDef := &base.TaskDefinition{ + Type: types.TaskTypeBalance, + Name: "balance", + DisplayName: "Volume Balance", + Description: "Balances volume distribution across servers", + Icon: "fas fa-balance-scale text-warning", + Capabilities: []string{"balance", "distribution"}, + + Config: config, + ConfigSpec: GetConfigSpec(), + CreateTask: CreateTask, + DetectionFunc: Detection, + ScanInterval: 30 * time.Minute, + SchedulingFunc: Scheduling, + MaxConcurrent: 1, + RepeatInterval: 2 * time.Hour, } - return sharedDetector, sharedScheduler -} -// GetSharedInstances returns the shared detector and scheduler instances (public access) -func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) { - return getSharedInstances() + // Store task definition globally for configuration updates + globalTaskDef = taskDef + + // Register everything with a single function call! + base.RegisterTask(taskDef) } -// Auto-register this task when the package is imported -func init() { - factory := NewFactory() - tasks.AutoRegister(types.TaskTypeBalance, factory) +// UpdateConfigFromPersistence updates the balance configuration from persistence +func UpdateConfigFromPersistence(configPersistence interface{}) error { + if globalTaskDef == nil { + return fmt.Errorf("balance task not registered") + } - // Get shared instances for all registrations - detector, scheduler := getSharedInstances() + // Load configuration from persistence + newConfig := LoadConfigFromPersistence(configPersistence) + if newConfig == nil { + return fmt.Errorf("failed to load configuration from persistence") + } - // Register with types registry - tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { - registry.RegisterTask(detector, scheduler) - }) + // Update the task definition's config + globalTaskDef.Config = newConfig - // Register with UI registry using the same instances - tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { - RegisterUI(uiRegistry, detector, scheduler) - }) + glog.V(1).Infof("Updated balance task configuration from persistence") + return nil } |
