aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/balance/balance_register.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/balance/balance_register.go')
-rw-r--r--weed/worker/tasks/balance/balance_register.go109
1 files changed, 50 insertions, 59 deletions
diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go
index 7c2d5a520..b26a40782 100644
--- a/weed/worker/tasks/balance/balance_register.go
+++ b/weed/worker/tasks/balance/balance_register.go
@@ -2,80 +2,71 @@ package balance
import (
"fmt"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
-// Factory creates balance task instances
-type Factory struct {
- *tasks.BaseTaskFactory
-}
+// Global variable to hold the task definition for configuration updates
+var globalTaskDef *base.TaskDefinition
-// NewFactory creates a new balance task factory
-func NewFactory() *Factory {
- return &Factory{
- BaseTaskFactory: tasks.NewBaseTaskFactory(
- types.TaskTypeBalance,
- []string{"balance", "storage", "optimization"},
- "Balance data across volume servers for optimal performance",
- ),
- }
-}
-
-// Create creates a new balance task instance
-func (f *Factory) Create(params types.TaskParams) (types.TaskInterface, error) {
- // Validate parameters
- if params.VolumeID == 0 {
- return nil, fmt.Errorf("volume_id is required")
- }
- if params.Server == "" {
- return nil, fmt.Errorf("server is required")
- }
-
- task := NewTask(params.Server, params.VolumeID, params.Collection)
- task.SetEstimatedDuration(task.EstimateTime(params))
+// Auto-register this task when the package is imported
+func init() {
+ RegisterBalanceTask()
- return task, nil
+ // Register config updater
+ tasks.AutoRegisterConfigUpdater(types.TaskTypeBalance, UpdateConfigFromPersistence)
}
-// Shared detector and scheduler instances
-var (
- sharedDetector *BalanceDetector
- sharedScheduler *BalanceScheduler
-)
+// RegisterBalanceTask registers the balance task with the new architecture
+func RegisterBalanceTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
-// getSharedInstances returns the shared detector and scheduler instances
-func getSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- if sharedDetector == nil {
- sharedDetector = NewBalanceDetector()
- }
- if sharedScheduler == nil {
- sharedScheduler = NewBalanceScheduler()
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeBalance,
+ Name: "balance",
+ DisplayName: "Volume Balance",
+ Description: "Balances volume distribution across servers",
+ Icon: "fas fa-balance-scale text-warning",
+ Capabilities: []string{"balance", "distribution"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 30 * time.Minute,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 2 * time.Hour,
}
- return sharedDetector, sharedScheduler
-}
-// GetSharedInstances returns the shared detector and scheduler instances (public access)
-func GetSharedInstances() (*BalanceDetector, *BalanceScheduler) {
- return getSharedInstances()
+ // Store task definition globally for configuration updates
+ globalTaskDef = taskDef
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
-// Auto-register this task when the package is imported
-func init() {
- factory := NewFactory()
- tasks.AutoRegister(types.TaskTypeBalance, factory)
+// UpdateConfigFromPersistence updates the balance configuration from persistence
+func UpdateConfigFromPersistence(configPersistence interface{}) error {
+ if globalTaskDef == nil {
+ return fmt.Errorf("balance task not registered")
+ }
- // Get shared instances for all registrations
- detector, scheduler := getSharedInstances()
+ // Load configuration from persistence
+ newConfig := LoadConfigFromPersistence(configPersistence)
+ if newConfig == nil {
+ return fmt.Errorf("failed to load configuration from persistence")
+ }
- // Register with types registry
- tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
- registry.RegisterTask(detector, scheduler)
- })
+ // Update the task definition's config
+ globalTaskDef.Config = newConfig
- // Register with UI registry using the same instances
- tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
- RegisterUI(uiRegistry, detector, scheduler)
- })
+ glog.V(1).Infof("Updated balance task configuration from persistence")
+ return nil
}