aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/admin/handlers/maintenance_handlers.go6
-rw-r--r--weed/worker/tasks/balance/balance.go263
-rw-r--r--weed/worker/tasks/balance/balance_register.go38
-rw-r--r--weed/worker/tasks/balance/config.go110
-rw-r--r--weed/worker/tasks/balance/detection.go135
-rw-r--r--weed/worker/tasks/erasure_coding/config.go125
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go126
-rw-r--r--weed/worker/tasks/erasure_coding/ec.go270
-rw-r--r--weed/worker/tasks/erasure_coding/ec_register.go38
-rw-r--r--weed/worker/tasks/vacuum/config.go128
-rw-r--r--weed/worker/tasks/vacuum/detection.go99
-rw-r--r--weed/worker/tasks/vacuum/vacuum.go246
-rw-r--r--weed/worker/tasks/vacuum/vacuum_register.go38
13 files changed, 834 insertions, 788 deletions
diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go
index a587d1b96..5995d2eef 100644
--- a/weed/admin/handlers/maintenance_handlers.go
+++ b/weed/admin/handlers/maintenance_handlers.go
@@ -192,11 +192,11 @@ func (h *MaintenanceHandlers) UpdateTaskConfig(c *gin.Context) {
var config interface{}
switch taskType {
case types.TaskTypeVacuum:
- config = &vacuum.VacuumConfig{}
+ config = &vacuum.Config{}
case types.TaskTypeBalance:
- config = &balance.BalanceConfig{}
+ config = &balance.Config{}
case types.TaskTypeErasureCoding:
- config = &erasure_coding.ErasureCodingConfig{}
+ config = &erasure_coding.Config{}
default:
c.JSON(http.StatusBadRequest, gin.H{"error": "Unsupported task type: " + taskTypeName})
return
diff --git a/weed/worker/tasks/balance/balance.go b/weed/worker/tasks/balance/balance.go
index 9e8fc7093..ea867d950 100644
--- a/weed/worker/tasks/balance/balance.go
+++ b/weed/worker/tasks/balance/balance.go
@@ -4,10 +4,8 @@ import (
"fmt"
"time"
- "github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -82,264 +80,3 @@ func (t *Task) EstimateTime(params types.TaskParams) time.Duration {
// Could adjust based on volume size or cluster state
return baseTime
}
-
-// BalanceConfig extends BaseConfig with balance-specific settings
-type BalanceConfig struct {
- base.BaseConfig
- ImbalanceThreshold float64 `json:"imbalance_threshold"`
- MinServerCount int `json:"min_server_count"`
-}
-
-// balanceDetection implements the detection logic for balance tasks
-func balanceDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
- if !config.IsEnabled() {
- return nil, nil
- }
-
- balanceConfig := config.(*BalanceConfig)
-
- // Skip if cluster is too small
- minVolumeCount := 10
- if len(metrics) < minVolumeCount {
- return nil, nil
- }
-
- // Analyze volume distribution across servers
- serverVolumeCounts := make(map[string]int)
- for _, metric := range metrics {
- serverVolumeCounts[metric.Server]++
- }
-
- if len(serverVolumeCounts) < balanceConfig.MinServerCount {
- return nil, nil
- }
-
- // Calculate balance metrics
- totalVolumes := len(metrics)
- avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
-
- maxVolumes := 0
- minVolumes := totalVolumes
- maxServer := ""
- minServer := ""
-
- for server, count := range serverVolumeCounts {
- if count > maxVolumes {
- maxVolumes = count
- maxServer = server
- }
- if count < minVolumes {
- minVolumes = count
- minServer = server
- }
- }
-
- // Check if imbalance exceeds threshold
- imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
- if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
- return nil, nil
- }
-
- // Create balance task
- reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
- imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
-
- task := &types.TaskDetectionResult{
- TaskType: types.TaskTypeBalance,
- Priority: types.TaskPriorityNormal,
- Reason: reason,
- ScheduleAt: time.Now(),
- Parameters: map[string]interface{}{
- "imbalance_ratio": imbalanceRatio,
- "threshold": balanceConfig.ImbalanceThreshold,
- "max_volumes": maxVolumes,
- "min_volumes": minVolumes,
- "avg_volumes_per_server": avgVolumesPerServer,
- "max_server": maxServer,
- "min_server": minServer,
- "total_servers": len(serverVolumeCounts),
- },
- }
-
- return []*types.TaskDetectionResult{task}, nil
-}
-
-// balanceScheduling implements the scheduling logic for balance tasks
-func balanceScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
- balanceConfig := config.(*BalanceConfig)
-
- // Count running balance tasks
- runningBalanceCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeBalance {
- runningBalanceCount++
- }
- }
-
- // Check concurrency limit
- if runningBalanceCount >= balanceConfig.MaxConcurrent {
- return false
- }
-
- // Check if we have available workers
- availableWorkerCount := 0
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeBalance {
- availableWorkerCount++
- break
- }
- }
- }
-
- return availableWorkerCount > 0
-}
-
-// createBalanceTask creates a new balance task instance
-func createBalanceTask(params types.TaskParams) (types.TaskInterface, error) {
- // Extract configuration from params
- var config *BalanceConfig
- if configData, ok := params.Parameters["config"]; ok {
- if configMap, ok := configData.(map[string]interface{}); ok {
- config = &BalanceConfig{}
- if err := config.FromMap(configMap); err != nil {
- return nil, fmt.Errorf("failed to parse balance config: %v", err)
- }
- }
- }
-
- if config == nil {
- config = &BalanceConfig{
- BaseConfig: base.BaseConfig{
- Enabled: true,
- ScanIntervalSeconds: 30 * 60, // 30 minutes
- MaxConcurrent: 1,
- },
- ImbalanceThreshold: 0.2, // 20%
- MinServerCount: 2,
- }
- }
-
- // Create and return the balance task using existing Task type
- return NewTask(params.Server, params.VolumeID, params.Collection), nil
-}
-
-// getBalanceConfigSpec returns the configuration schema for balance tasks
-func getBalanceConfigSpec() base.ConfigSpec {
- return base.ConfigSpec{
- Fields: []*config.Field{
- {
- Name: "enabled",
- JSONName: "enabled",
- Type: config.FieldTypeBool,
- DefaultValue: true,
- Required: false,
- DisplayName: "Enable Balance Tasks",
- Description: "Whether balance tasks should be automatically created",
- HelpText: "Toggle this to enable or disable automatic balance task generation",
- InputType: "checkbox",
- CSSClasses: "form-check-input",
- },
- {
- Name: "scan_interval_seconds",
- JSONName: "scan_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 30 * 60,
- MinValue: 5 * 60,
- MaxValue: 2 * 60 * 60,
- Required: true,
- DisplayName: "Scan Interval",
- Description: "How often to scan for volume distribution imbalances",
- HelpText: "The system will check for volume distribution imbalances at this interval",
- Placeholder: "30",
- Unit: config.UnitMinutes,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "max_concurrent",
- JSONName: "max_concurrent",
- Type: config.FieldTypeInt,
- DefaultValue: 1,
- MinValue: 1,
- MaxValue: 3,
- Required: true,
- DisplayName: "Max Concurrent Tasks",
- Description: "Maximum number of balance tasks that can run simultaneously",
- HelpText: "Limits the number of balance operations running at the same time",
- Placeholder: "1 (default)",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "imbalance_threshold",
- JSONName: "imbalance_threshold",
- Type: config.FieldTypeFloat,
- DefaultValue: 0.2,
- MinValue: 0.05,
- MaxValue: 0.5,
- Required: true,
- DisplayName: "Imbalance Threshold",
- Description: "Minimum imbalance ratio to trigger balancing",
- HelpText: "Volume distribution imbalances above this threshold will trigger balancing",
- Placeholder: "0.20 (20%)",
- Unit: config.UnitNone,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "min_server_count",
- JSONName: "min_server_count",
- Type: config.FieldTypeInt,
- DefaultValue: 2,
- MinValue: 2,
- MaxValue: 10,
- Required: true,
- DisplayName: "Minimum Server Count",
- Description: "Minimum number of servers required for balancing",
- HelpText: "Balancing will only occur if there are at least this many servers",
- Placeholder: "2 (default)",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- },
- }
-}
-
-// initBalance registers the refactored balance task
-func initBalance() {
- // Create configuration instance
- config := &BalanceConfig{
- BaseConfig: base.BaseConfig{
- Enabled: true,
- ScanIntervalSeconds: 30 * 60, // 30 minutes
- MaxConcurrent: 1,
- },
- ImbalanceThreshold: 0.2, // 20%
- MinServerCount: 2,
- }
-
- // Create complete task definition
- taskDef := &base.TaskDefinition{
- Type: types.TaskTypeBalance,
- Name: "balance",
- DisplayName: "Volume Balance",
- Description: "Balances volume distribution across servers",
- Icon: "fas fa-balance-scale text-warning",
- Capabilities: []string{"balance", "distribution"},
-
- Config: config,
- ConfigSpec: getBalanceConfigSpec(),
- CreateTask: createBalanceTask,
- DetectionFunc: balanceDetection,
- ScanInterval: 30 * time.Minute,
- SchedulingFunc: balanceScheduling,
- MaxConcurrent: 1,
- RepeatInterval: 2 * time.Hour,
- }
-
- // Register everything with a single function call!
- base.RegisterTask(taskDef)
-}
diff --git a/weed/worker/tasks/balance/balance_register.go b/weed/worker/tasks/balance/balance_register.go
index 5df8e9631..6a23cbbbd 100644
--- a/weed/worker/tasks/balance/balance_register.go
+++ b/weed/worker/tasks/balance/balance_register.go
@@ -1,7 +1,41 @@
package balance
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
// Auto-register this task when the package is imported
func init() {
- // Use new architecture instead of old registration
- initBalance()
+ RegisterBalanceTask()
+}
+
+// RegisterBalanceTask registers the balance task with the new architecture
+func RegisterBalanceTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
+
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeBalance,
+ Name: "balance",
+ DisplayName: "Volume Balance",
+ Description: "Balances volume distribution across servers",
+ Icon: "fas fa-balance-scale text-warning",
+ Capabilities: []string{"balance", "distribution"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 30 * time.Minute,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 2 * time.Hour,
+ }
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go
new file mode 100644
index 000000000..9ee1fa777
--- /dev/null
+++ b/weed/worker/tasks/balance/config.go
@@ -0,0 +1,110 @@
+package balance
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with balance-specific settings
+type Config struct {
+ base.BaseConfig
+ ImbalanceThreshold float64 `json:"imbalance_threshold"`
+ MinServerCount int `json:"min_server_count"`
+}
+
+// NewDefaultConfig creates a new default balance configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 30 * 60, // 30 minutes
+ MaxConcurrent: 1,
+ },
+ ImbalanceThreshold: 0.2, // 20%
+ MinServerCount: 2,
+ }
+}
+
+// GetConfigSpec returns the configuration schema for balance tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Balance Tasks",
+ Description: "Whether balance tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic balance task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 30 * 60,
+ MinValue: 5 * 60,
+ MaxValue: 2 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volume distribution imbalances",
+ HelpText: "The system will check for volume distribution imbalances at this interval",
+ Placeholder: "30",
+ Unit: config.UnitMinutes,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 3,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of balance tasks that can run simultaneously",
+ HelpText: "Limits the number of balance operations running at the same time",
+ Placeholder: "1 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "imbalance_threshold",
+ JSONName: "imbalance_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.2,
+ MinValue: 0.05,
+ MaxValue: 0.5,
+ Required: true,
+ DisplayName: "Imbalance Threshold",
+ Description: "Minimum imbalance ratio to trigger balancing",
+ HelpText: "Volume distribution imbalances above this threshold will trigger balancing",
+ Placeholder: "0.20 (20%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_server_count",
+ JSONName: "min_server_count",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 2,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Minimum Server Count",
+ Description: "Minimum number of servers required for balancing",
+ HelpText: "Balancing will only occur if there are at least this many servers",
+ Placeholder: "2 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go
new file mode 100644
index 000000000..6249d8fd7
--- /dev/null
+++ b/weed/worker/tasks/balance/detection.go
@@ -0,0 +1,135 @@
+package balance
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for balance tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ balanceConfig := config.(*Config)
+
+ // Skip if cluster is too small
+ minVolumeCount := 10
+ if len(metrics) < minVolumeCount {
+ return nil, nil
+ }
+
+ // Analyze volume distribution across servers
+ serverVolumeCounts := make(map[string]int)
+ for _, metric := range metrics {
+ serverVolumeCounts[metric.Server]++
+ }
+
+ if len(serverVolumeCounts) < balanceConfig.MinServerCount {
+ return nil, nil
+ }
+
+ // Calculate balance metrics
+ totalVolumes := len(metrics)
+ avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
+
+ maxVolumes := 0
+ minVolumes := totalVolumes
+ maxServer := ""
+ minServer := ""
+
+ for server, count := range serverVolumeCounts {
+ if count > maxVolumes {
+ maxVolumes = count
+ maxServer = server
+ }
+ if count < minVolumes {
+ minVolumes = count
+ minServer = server
+ }
+ }
+
+ // Check if imbalance exceeds threshold
+ imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
+ if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
+ return nil, nil
+ }
+
+ // Create balance task
+ reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
+ imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+
+ task := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeBalance,
+ Priority: types.TaskPriorityNormal,
+ Reason: reason,
+ ScheduleAt: time.Now(),
+ Parameters: map[string]interface{}{
+ "imbalance_ratio": imbalanceRatio,
+ "threshold": balanceConfig.ImbalanceThreshold,
+ "max_volumes": maxVolumes,
+ "min_volumes": minVolumes,
+ "avg_volumes_per_server": avgVolumesPerServer,
+ "max_server": maxServer,
+ "min_server": minServer,
+ "total_servers": len(serverVolumeCounts),
+ },
+ }
+
+ return []*types.TaskDetectionResult{task}, nil
+}
+
+// Scheduling implements the scheduling logic for balance tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ balanceConfig := config.(*Config)
+
+ // Count running balance tasks
+ runningBalanceCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeBalance {
+ runningBalanceCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningBalanceCount >= balanceConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ availableWorkerCount := 0
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeBalance {
+ availableWorkerCount++
+ break
+ }
+ }
+ }
+
+ return availableWorkerCount > 0
+}
+
+// CreateTask creates a new balance task instance
+func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Extract configuration from params
+ var config *Config
+ if configData, ok := params.Parameters["config"]; ok {
+ if configMap, ok := configData.(map[string]interface{}); ok {
+ config = &Config{}
+ if err := config.FromMap(configMap); err != nil {
+ return nil, fmt.Errorf("failed to parse balance config: %v", err)
+ }
+ }
+ }
+
+ if config == nil {
+ config = NewDefaultConfig()
+ }
+
+ // Create and return the balance task using existing Task type
+ return NewTask(params.Server, params.VolumeID, params.Collection), nil
+}
diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go
new file mode 100644
index 000000000..2eaaccdba
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/config.go
@@ -0,0 +1,125 @@
+package erasure_coding
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with erasure coding specific settings
+type Config struct {
+ base.BaseConfig
+ QuietForSeconds int `json:"quiet_for_seconds"`
+ FullnessRatio float64 `json:"fullness_ratio"`
+ CollectionFilter string `json:"collection_filter"`
+}
+
+// NewDefaultConfig creates a new default erasure coding configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 60 * 60, // 1 hour
+ MaxConcurrent: 1,
+ },
+ QuietForSeconds: 300, // 5 minutes
+ FullnessRatio: 0.8, // 80%
+ CollectionFilter: "",
+ }
+}
+
+// GetConfigSpec returns the configuration schema for erasure coding tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Erasure Coding Tasks",
+ Description: "Whether erasure coding tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing erasure coding",
+ HelpText: "The system will check for volumes that need erasure coding at this interval",
+ Placeholder: "1",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 1,
+ MinValue: 1,
+ MaxValue: 5,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of erasure coding tasks that can run simultaneously",
+ HelpText: "Limits the number of erasure coding operations running at the same time",
+ Placeholder: "1 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "quiet_for_seconds",
+ JSONName: "quiet_for_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 300,
+ MinValue: 60,
+ MaxValue: 3600,
+ Required: true,
+ DisplayName: "Quiet Period",
+ Description: "Minimum time volume must be quiet before erasure coding",
+ HelpText: "Volume must not be modified for this duration before erasure coding",
+ Placeholder: "5",
+ Unit: config.UnitMinutes,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "fullness_ratio",
+ JSONName: "fullness_ratio",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.8,
+ MinValue: 0.1,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Fullness Ratio",
+ Description: "Minimum fullness ratio to trigger erasure coding",
+ HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
+ Placeholder: "0.80 (80%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "collection_filter",
+ JSONName: "collection_filter",
+ Type: config.FieldTypeString,
+ DefaultValue: "",
+ Required: false,
+ DisplayName: "Collection Filter",
+ Description: "Only process volumes from specific collections",
+ HelpText: "Leave empty to process all collections, or specify collection name",
+ Placeholder: "my_collection",
+ InputType: "text",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
new file mode 100644
index 000000000..756da6fea
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -0,0 +1,126 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for erasure coding tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ ecConfig := config.(*Config)
+ var results []*types.TaskDetectionResult
+ now := time.Now()
+ quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
+ minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum
+
+ for _, metric := range metrics {
+ // Skip if already EC volume
+ if metric.IsECVolume {
+ continue
+ }
+
+ // Check minimum size requirement
+ if metric.Size < minSizeBytes {
+ continue
+ }
+
+ // Check collection filter if specified
+ if ecConfig.CollectionFilter != "" {
+ // Parse comma-separated collections
+ allowedCollections := make(map[string]bool)
+ for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
+ allowedCollections[strings.TrimSpace(collection)] = true
+ }
+ // Skip if volume's collection is not in the allowed list
+ if !allowedCollections[metric.Collection] {
+ continue
+ }
+ }
+
+ // Check quiet duration and fullness criteria
+ if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: types.TaskPriorityLow, // EC is not urgent
+ Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
+ metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
+ float64(metric.Size)/(1024*1024)),
+ Parameters: map[string]interface{}{
+ "age_seconds": int(metric.Age.Seconds()),
+ "fullness_ratio": metric.FullnessRatio,
+ "size_mb": int(metric.Size / (1024 * 1024)),
+ },
+ ScheduleAt: now,
+ }
+ results = append(results, result)
+ }
+ }
+
+ return results, nil
+}
+
+// Scheduling implements the scheduling logic for erasure coding tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ ecConfig := config.(*Config)
+
+ // Check if we have available workers
+ if len(availableWorkers) == 0 {
+ return false
+ }
+
+ // Count running EC tasks
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeErasureCoding {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningCount >= ecConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check if any worker can handle EC tasks
+ for _, worker := range availableWorkers {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeErasureCoding {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// CreateTask creates a new erasure coding task instance
+func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Extract configuration from params
+ var config *Config
+ if configData, ok := params.Parameters["config"]; ok {
+ if configMap, ok := configData.(map[string]interface{}); ok {
+ config = &Config{}
+ if err := config.FromMap(configMap); err != nil {
+ return nil, fmt.Errorf("failed to parse erasure coding config: %v", err)
+ }
+ }
+ }
+
+ if config == nil {
+ config = NewDefaultConfig()
+ }
+
+ // Create and return the erasure coding task using existing Task type
+ return NewTask(params.Server, params.VolumeID), nil
+}
diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go
index bbf15c079..61c13188f 100644
--- a/weed/worker/tasks/erasure_coding/ec.go
+++ b/weed/worker/tasks/erasure_coding/ec.go
@@ -7,13 +7,11 @@ import (
"math"
"os"
"path/filepath"
- "strings"
"sync"
"time"
"google.golang.org/grpc"
- "github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -23,7 +21,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc/credentials/insecure"
)
@@ -1019,270 +1016,3 @@ func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32)
glog.V(1).Infof("MOUNT SUCCESS: Shard %d successfully mounted on %s", shardId, targetServer)
return nil
}
-
-// ErasureCodingConfig extends BaseConfig with erasure coding specific settings
-type ErasureCodingConfig struct {
- base.BaseConfig
- QuietForSeconds int `json:"quiet_for_seconds"`
- FullnessRatio float64 `json:"fullness_ratio"`
- CollectionFilter string `json:"collection_filter"`
-}
-
-// ecDetection implements the detection logic for erasure coding tasks
-func ecDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
- if !config.IsEnabled() {
- return nil, nil
- }
-
- ecConfig := config.(*ErasureCodingConfig)
- var results []*types.TaskDetectionResult
- now := time.Now()
- quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
- minSizeBytes := uint64(100) * 1024 * 1024 // 100MB minimum
-
- for _, metric := range metrics {
- // Skip if already EC volume
- if metric.IsECVolume {
- continue
- }
-
- // Check minimum size requirement
- if metric.Size < minSizeBytes {
- continue
- }
-
- // Check collection filter if specified
- if ecConfig.CollectionFilter != "" {
- // Parse comma-separated collections
- allowedCollections := make(map[string]bool)
- for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
- allowedCollections[strings.TrimSpace(collection)] = true
- }
- // Skip if volume's collection is not in the allowed list
- if !allowedCollections[metric.Collection] {
- continue
- }
- }
-
- // Check quiet duration and fullness criteria
- if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeErasureCoding,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: types.TaskPriorityLow, // EC is not urgent
- Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>100MB)",
- metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
- float64(metric.Size)/(1024*1024)),
- Parameters: map[string]interface{}{
- "age_seconds": int(metric.Age.Seconds()),
- "fullness_ratio": metric.FullnessRatio,
- "size_mb": int(metric.Size / (1024 * 1024)),
- },
- ScheduleAt: now,
- }
- results = append(results, result)
- }
- }
-
- return results, nil
-}
-
-// ecScheduling implements the scheduling logic for erasure coding tasks
-func ecScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
- ecConfig := config.(*ErasureCodingConfig)
-
- // Check if we have available workers
- if len(availableWorkers) == 0 {
- return false
- }
-
- // Count running EC tasks
- runningCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeErasureCoding {
- runningCount++
- }
- }
-
- // Check concurrency limit
- if runningCount >= ecConfig.MaxConcurrent {
- return false
- }
-
- // Check if any worker can handle EC tasks
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeErasureCoding {
- return true
- }
- }
- }
-
- return false
-}
-
-// createErasureCodingTask creates a new erasure coding task instance
-func createErasureCodingTask(params types.TaskParams) (types.TaskInterface, error) {
- // Extract configuration from params
- var config *ErasureCodingConfig
- if configData, ok := params.Parameters["config"]; ok {
- if configMap, ok := configData.(map[string]interface{}); ok {
- config = &ErasureCodingConfig{}
- if err := config.FromMap(configMap); err != nil {
- return nil, fmt.Errorf("failed to parse erasure coding config: %v", err)
- }
- }
- }
-
- if config == nil {
- config = &ErasureCodingConfig{
- BaseConfig: base.BaseConfig{
- Enabled: true,
- ScanIntervalSeconds: 60 * 60, // 1 hour
- MaxConcurrent: 1,
- },
- QuietForSeconds: 300, // 5 minutes
- FullnessRatio: 0.8, // 80%
- CollectionFilter: "",
- }
- }
-
- // Create and return the erasure coding task using existing Task type
- return NewTask(params.Server, params.VolumeID), nil
-}
-
-// getErasureCodingConfigSpec returns the configuration schema for erasure coding tasks
-func getErasureCodingConfigSpec() base.ConfigSpec {
- return base.ConfigSpec{
- Fields: []*config.Field{
- {
- Name: "enabled",
- JSONName: "enabled",
- Type: config.FieldTypeBool,
- DefaultValue: true,
- Required: false,
- DisplayName: "Enable Erasure Coding Tasks",
- Description: "Whether erasure coding tasks should be automatically created",
- HelpText: "Toggle this to enable or disable automatic erasure coding task generation",
- InputType: "checkbox",
- CSSClasses: "form-check-input",
- },
- {
- Name: "scan_interval_seconds",
- JSONName: "scan_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 60 * 60,
- MinValue: 10 * 60,
- MaxValue: 24 * 60 * 60,
- Required: true,
- DisplayName: "Scan Interval",
- Description: "How often to scan for volumes needing erasure coding",
- HelpText: "The system will check for volumes that need erasure coding at this interval",
- Placeholder: "1",
- Unit: config.UnitHours,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "max_concurrent",
- JSONName: "max_concurrent",
- Type: config.FieldTypeInt,
- DefaultValue: 1,
- MinValue: 1,
- MaxValue: 5,
- Required: true,
- DisplayName: "Max Concurrent Tasks",
- Description: "Maximum number of erasure coding tasks that can run simultaneously",
- HelpText: "Limits the number of erasure coding operations running at the same time",
- Placeholder: "1 (default)",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "quiet_for_seconds",
- JSONName: "quiet_for_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 300,
- MinValue: 60,
- MaxValue: 3600,
- Required: true,
- DisplayName: "Quiet Period",
- Description: "Minimum time volume must be quiet before erasure coding",
- HelpText: "Volume must not be modified for this duration before erasure coding",
- Placeholder: "5",
- Unit: config.UnitMinutes,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "fullness_ratio",
- JSONName: "fullness_ratio",
- Type: config.FieldTypeFloat,
- DefaultValue: 0.8,
- MinValue: 0.1,
- MaxValue: 1.0,
- Required: true,
- DisplayName: "Fullness Ratio",
- Description: "Minimum fullness ratio to trigger erasure coding",
- HelpText: "Only volumes with this fullness ratio or higher will be erasure coded",
- Placeholder: "0.80 (80%)",
- Unit: config.UnitNone,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "collection_filter",
- JSONName: "collection_filter",
- Type: config.FieldTypeString,
- DefaultValue: "",
- Required: false,
- DisplayName: "Collection Filter",
- Description: "Only process volumes from specific collections",
- HelpText: "Leave empty to process all collections, or specify collection name",
- Placeholder: "my_collection",
- InputType: "text",
- CSSClasses: "form-control",
- },
- },
- }
-}
-
-// initErasureCoding registers the refactored erasure coding task
-func initErasureCoding() {
- // Create configuration instance
- config := &ErasureCodingConfig{
- BaseConfig: base.BaseConfig{
- Enabled: true,
- ScanIntervalSeconds: 60 * 60, // 1 hour
- MaxConcurrent: 1,
- },
- QuietForSeconds: 300, // 5 minutes
- FullnessRatio: 0.8, // 80%
- CollectionFilter: "",
- }
-
- // Create complete task definition
- taskDef := &base.TaskDefinition{
- Type: types.TaskTypeErasureCoding,
- Name: "erasure_coding",
- DisplayName: "Erasure Coding",
- Description: "Applies erasure coding to volumes for data protection",
- Icon: "fas fa-shield-alt text-success",
- Capabilities: []string{"erasure_coding", "data_protection"},
-
- Config: config,
- ConfigSpec: getErasureCodingConfigSpec(),
- CreateTask: createErasureCodingTask,
- DetectionFunc: ecDetection,
- ScanInterval: 1 * time.Hour,
- SchedulingFunc: ecScheduling,
- MaxConcurrent: 1,
- RepeatInterval: 24 * time.Hour,
- }
-
- // Register everything with a single function call!
- base.RegisterTask(taskDef)
-}
diff --git a/weed/worker/tasks/erasure_coding/ec_register.go b/weed/worker/tasks/erasure_coding/ec_register.go
index 5c68741e3..807e9705d 100644
--- a/weed/worker/tasks/erasure_coding/ec_register.go
+++ b/weed/worker/tasks/erasure_coding/ec_register.go
@@ -1,7 +1,41 @@
package erasure_coding
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
// Auto-register this task when the package is imported
func init() {
- // Use new architecture instead of old registration
- initErasureCoding()
+ RegisterErasureCodingTask()
+}
+
+// RegisterErasureCodingTask registers the erasure coding task with the new architecture
+func RegisterErasureCodingTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
+
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeErasureCoding,
+ Name: "erasure_coding",
+ DisplayName: "Erasure Coding",
+ Description: "Applies erasure coding to volumes for data protection",
+ Icon: "fas fa-shield-alt text-success",
+ Capabilities: []string{"erasure_coding", "data_protection"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 1 * time.Hour,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 1,
+ RepeatInterval: 24 * time.Hour,
+ }
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}
diff --git a/weed/worker/tasks/vacuum/config.go b/weed/worker/tasks/vacuum/config.go
new file mode 100644
index 000000000..39ce10c3d
--- /dev/null
+++ b/weed/worker/tasks/vacuum/config.go
@@ -0,0 +1,128 @@
+package vacuum
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+)
+
+// Config extends BaseConfig with vacuum-specific settings
+type Config struct {
+ base.BaseConfig
+ GarbageThreshold float64 `json:"garbage_threshold"`
+ MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
+ MinIntervalSeconds int `json:"min_interval_seconds"`
+}
+
+// NewDefaultConfig creates a new default vacuum configuration
+func NewDefaultConfig() *Config {
+ return &Config{
+ BaseConfig: base.BaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
+ MaxConcurrent: 2,
+ },
+ GarbageThreshold: 0.3, // 30%
+ MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
+ MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
+ }
+}
+
+// GetConfigSpec returns the configuration schema for vacuum tasks
+func GetConfigSpec() base.ConfigSpec {
+ return base.ConfigSpec{
+ Fields: []*config.Field{
+ {
+ Name: "enabled",
+ JSONName: "enabled",
+ Type: config.FieldTypeBool,
+ DefaultValue: true,
+ Required: false,
+ DisplayName: "Enable Vacuum Tasks",
+ Description: "Whether vacuum tasks should be automatically created",
+ HelpText: "Toggle this to enable or disable automatic vacuum task generation",
+ InputType: "checkbox",
+ CSSClasses: "form-check-input",
+ },
+ {
+ Name: "scan_interval_seconds",
+ JSONName: "scan_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 2 * 60 * 60,
+ MinValue: 10 * 60,
+ MaxValue: 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Scan Interval",
+ Description: "How often to scan for volumes needing vacuum",
+ HelpText: "The system will check for volumes that need vacuuming at this interval",
+ Placeholder: "2",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "max_concurrent",
+ JSONName: "max_concurrent",
+ Type: config.FieldTypeInt,
+ DefaultValue: 2,
+ MinValue: 1,
+ MaxValue: 10,
+ Required: true,
+ DisplayName: "Max Concurrent Tasks",
+ Description: "Maximum number of vacuum tasks that can run simultaneously",
+ HelpText: "Limits the number of vacuum operations running at the same time to control system load",
+ Placeholder: "2 (default)",
+ Unit: config.UnitCount,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "garbage_threshold",
+ JSONName: "garbage_threshold",
+ Type: config.FieldTypeFloat,
+ DefaultValue: 0.3,
+ MinValue: 0.0,
+ MaxValue: 1.0,
+ Required: true,
+ DisplayName: "Garbage Percentage Threshold",
+ Description: "Trigger vacuum when garbage ratio exceeds this percentage",
+ HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
+ Placeholder: "0.30 (30%)",
+ Unit: config.UnitNone,
+ InputType: "number",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_volume_age_seconds",
+ JSONName: "min_volume_age_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 24 * 60 * 60,
+ MinValue: 1 * 60 * 60,
+ MaxValue: 7 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Volume Age",
+ Description: "Only vacuum volumes older than this duration",
+ HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
+ Placeholder: "24",
+ Unit: config.UnitHours,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ {
+ Name: "min_interval_seconds",
+ JSONName: "min_interval_seconds",
+ Type: config.FieldTypeInterval,
+ DefaultValue: 7 * 24 * 60 * 60,
+ MinValue: 1 * 24 * 60 * 60,
+ MaxValue: 30 * 24 * 60 * 60,
+ Required: true,
+ DisplayName: "Minimum Interval",
+ Description: "Minimum time between vacuum operations on the same volume",
+ HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
+ Placeholder: "7",
+ Unit: config.UnitDays,
+ InputType: "interval",
+ CSSClasses: "form-control",
+ },
+ },
+ }
+}
diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go
new file mode 100644
index 000000000..3ae23814a
--- /dev/null
+++ b/weed/worker/tasks/vacuum/detection.go
@@ -0,0 +1,99 @@
+package vacuum
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// Detection implements the detection logic for vacuum tasks
+func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
+ if !config.IsEnabled() {
+ return nil, nil
+ }
+
+ vacuumConfig := config.(*Config)
+ var results []*types.TaskDetectionResult
+ minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
+
+ for _, metric := range metrics {
+ // Check if volume needs vacuum
+ if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
+ priority := types.TaskPriorityNormal
+ if metric.GarbageRatio > 0.6 {
+ priority = types.TaskPriorityHigh
+ }
+
+ result := &types.TaskDetectionResult{
+ TaskType: types.TaskTypeVacuum,
+ VolumeID: metric.VolumeID,
+ Server: metric.Server,
+ Collection: metric.Collection,
+ Priority: priority,
+ Reason: "Volume has excessive garbage requiring vacuum",
+ Parameters: map[string]interface{}{
+ "garbage_ratio": metric.GarbageRatio,
+ "volume_age": metric.Age.String(),
+ },
+ ScheduleAt: time.Now(),
+ }
+ results = append(results, result)
+ }
+ }
+
+ return results, nil
+}
+
+// Scheduling implements the scheduling logic for vacuum tasks
+func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
+ vacuumConfig := config.(*Config)
+
+ // Count running vacuum tasks
+ runningVacuumCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == types.TaskTypeVacuum {
+ runningVacuumCount++
+ }
+ }
+
+ // Check concurrency limit
+ if runningVacuumCount >= vacuumConfig.MaxConcurrent {
+ return false
+ }
+
+ // Check for available workers with vacuum capability
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == types.TaskTypeVacuum {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// CreateTask creates a new vacuum task instance
+func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
+ // Extract configuration from params
+ var config *Config
+ if configData, ok := params.Parameters["config"]; ok {
+ if configMap, ok := configData.(map[string]interface{}); ok {
+ config = &Config{}
+ if err := config.FromMap(configMap); err != nil {
+ return nil, fmt.Errorf("failed to parse vacuum config: %v", err)
+ }
+ }
+ }
+
+ if config == nil {
+ config = NewDefaultConfig()
+ }
+
+ // Create and return the vacuum task using existing Task type
+ return NewTask(params.Server, params.VolumeID), nil
+}
diff --git a/weed/worker/tasks/vacuum/vacuum.go b/weed/worker/tasks/vacuum/vacuum.go
index fe5ebc96e..24233f59b 100644
--- a/weed/worker/tasks/vacuum/vacuum.go
+++ b/weed/worker/tasks/vacuum/vacuum.go
@@ -7,12 +7,10 @@ import (
"strconv"
"time"
- "github.com/seaweedfs/seaweedfs/weed/admin/config"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -199,247 +197,3 @@ func (t *Task) GetProgress() float64 {
func (t *Task) Cancel() error {
return t.BaseTask.Cancel()
}
-
-// VacuumConfig extends BaseConfig with vacuum-specific settings
-type VacuumConfig struct {
- base.BaseConfig
- GarbageThreshold float64 `json:"garbage_threshold"`
- MinVolumeAgeSeconds int `json:"min_volume_age_seconds"`
- MinIntervalSeconds int `json:"min_interval_seconds"`
-}
-
-// vacuumDetection implements the detection logic for vacuum tasks
-func vacuumDetection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
- if !config.IsEnabled() {
- return nil, nil
- }
-
- vacuumConfig := config.(*VacuumConfig)
- var results []*types.TaskDetectionResult
- minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
-
- for _, metric := range metrics {
- // Check if volume needs vacuum
- if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
- priority := types.TaskPriorityNormal
- if metric.GarbageRatio > 0.6 {
- priority = types.TaskPriorityHigh
- }
-
- result := &types.TaskDetectionResult{
- TaskType: types.TaskTypeVacuum,
- VolumeID: metric.VolumeID,
- Server: metric.Server,
- Collection: metric.Collection,
- Priority: priority,
- Reason: "Volume has excessive garbage requiring vacuum",
- Parameters: map[string]interface{}{
- "garbage_ratio": metric.GarbageRatio,
- "volume_age": metric.Age.String(),
- },
- ScheduleAt: time.Now(),
- }
- results = append(results, result)
- }
- }
-
- return results, nil
-}
-
-// vacuumScheduling implements the scheduling logic for vacuum tasks
-func vacuumScheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
- vacuumConfig := config.(*VacuumConfig)
-
- // Count running vacuum tasks
- runningVacuumCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeVacuum {
- runningVacuumCount++
- }
- }
-
- // Check concurrency limit
- if runningVacuumCount >= vacuumConfig.MaxConcurrent {
- return false
- }
-
- // Check for available workers with vacuum capability
- for _, worker := range availableWorkers {
- if worker.CurrentLoad < worker.MaxConcurrent {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeVacuum {
- return true
- }
- }
- }
- }
-
- return false
-}
-
-// createVacuumTask creates a new vacuum task instance
-func createVacuumTask(params types.TaskParams) (types.TaskInterface, error) {
- // Extract configuration from params
- var config *VacuumConfig
- if configData, ok := params.Parameters["config"]; ok {
- if configMap, ok := configData.(map[string]interface{}); ok {
- config = &VacuumConfig{}
- if err := config.FromMap(configMap); err != nil {
- return nil, fmt.Errorf("failed to parse vacuum config: %v", err)
- }
- }
- }
-
- if config == nil {
- config = &VacuumConfig{
- BaseConfig: base.BaseConfig{
- Enabled: true,
- ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
- MaxConcurrent: 2,
- },
- GarbageThreshold: 0.3, // 30%
- MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
- MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
- }
- }
-
- // Create and return the vacuum task using existing Task type
- return NewTask(params.Server, params.VolumeID), nil
-}
-
-// getVacuumConfigSpec returns the configuration schema for vacuum tasks
-func getVacuumConfigSpec() base.ConfigSpec {
- return base.ConfigSpec{
- Fields: []*config.Field{
- {
- Name: "enabled",
- JSONName: "enabled",
- Type: config.FieldTypeBool,
- DefaultValue: true,
- Required: false,
- DisplayName: "Enable Vacuum Tasks",
- Description: "Whether vacuum tasks should be automatically created",
- HelpText: "Toggle this to enable or disable automatic vacuum task generation",
- InputType: "checkbox",
- CSSClasses: "form-check-input",
- },
- {
- Name: "scan_interval_seconds",
- JSONName: "scan_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 2 * 60 * 60,
- MinValue: 10 * 60,
- MaxValue: 24 * 60 * 60,
- Required: true,
- DisplayName: "Scan Interval",
- Description: "How often to scan for volumes needing vacuum",
- HelpText: "The system will check for volumes that need vacuuming at this interval",
- Placeholder: "2",
- Unit: config.UnitHours,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "max_concurrent",
- JSONName: "max_concurrent",
- Type: config.FieldTypeInt,
- DefaultValue: 2,
- MinValue: 1,
- MaxValue: 10,
- Required: true,
- DisplayName: "Max Concurrent Tasks",
- Description: "Maximum number of vacuum tasks that can run simultaneously",
- HelpText: "Limits the number of vacuum operations running at the same time to control system load",
- Placeholder: "2 (default)",
- Unit: config.UnitCount,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "garbage_threshold",
- JSONName: "garbage_threshold",
- Type: config.FieldTypeFloat,
- DefaultValue: 0.3,
- MinValue: 0.0,
- MaxValue: 1.0,
- Required: true,
- DisplayName: "Garbage Percentage Threshold",
- Description: "Trigger vacuum when garbage ratio exceeds this percentage",
- HelpText: "Volumes with more deleted content than this threshold will be vacuumed",
- Placeholder: "0.30 (30%)",
- Unit: config.UnitNone,
- InputType: "number",
- CSSClasses: "form-control",
- },
- {
- Name: "min_volume_age_seconds",
- JSONName: "min_volume_age_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 24 * 60 * 60,
- MinValue: 1 * 60 * 60,
- MaxValue: 7 * 24 * 60 * 60,
- Required: true,
- DisplayName: "Minimum Volume Age",
- Description: "Only vacuum volumes older than this duration",
- HelpText: "Prevents vacuuming of recently created volumes that may still be actively written to",
- Placeholder: "24",
- Unit: config.UnitHours,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- {
- Name: "min_interval_seconds",
- JSONName: "min_interval_seconds",
- Type: config.FieldTypeInterval,
- DefaultValue: 7 * 24 * 60 * 60,
- MinValue: 1 * 24 * 60 * 60,
- MaxValue: 30 * 24 * 60 * 60,
- Required: true,
- DisplayName: "Minimum Interval",
- Description: "Minimum time between vacuum operations on the same volume",
- HelpText: "Prevents excessive vacuuming of the same volume by enforcing a minimum wait time",
- Placeholder: "7",
- Unit: config.UnitDays,
- InputType: "interval",
- CSSClasses: "form-control",
- },
- },
- }
-}
-
-// initVacuum registers the refactored vacuum task (replaces the old registration)
-func initVacuum() {
- // Create configuration instance
- config := &VacuumConfig{
- BaseConfig: base.BaseConfig{
- Enabled: true,
- ScanIntervalSeconds: 2 * 60 * 60, // 2 hours
- MaxConcurrent: 2,
- },
- GarbageThreshold: 0.3, // 30%
- MinVolumeAgeSeconds: 24 * 60 * 60, // 24 hours
- MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
- }
-
- // Create complete task definition
- taskDef := &base.TaskDefinition{
- Type: types.TaskTypeVacuum,
- Name: "vacuum",
- DisplayName: "Volume Vacuum",
- Description: "Reclaims disk space by removing deleted files from volumes",
- Icon: "fas fa-broom text-primary",
- Capabilities: []string{"vacuum", "storage"},
-
- Config: config,
- ConfigSpec: getVacuumConfigSpec(),
- CreateTask: createVacuumTask,
- DetectionFunc: vacuumDetection,
- ScanInterval: 2 * time.Hour,
- SchedulingFunc: vacuumScheduling,
- MaxConcurrent: 2,
- RepeatInterval: 7 * 24 * time.Hour,
- }
-
- // Register everything with a single function call!
- base.RegisterTask(taskDef)
-}
diff --git a/weed/worker/tasks/vacuum/vacuum_register.go b/weed/worker/tasks/vacuum/vacuum_register.go
index 9d247be71..1d00937b0 100644
--- a/weed/worker/tasks/vacuum/vacuum_register.go
+++ b/weed/worker/tasks/vacuum/vacuum_register.go
@@ -1,7 +1,41 @@
package vacuum
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
// Auto-register this task when the package is imported
func init() {
- // Use new architecture instead of old registration
- initVacuum()
+ RegisterVacuumTask()
+}
+
+// RegisterVacuumTask registers the vacuum task with the new architecture
+func RegisterVacuumTask() {
+ // Create configuration instance
+ config := NewDefaultConfig()
+
+ // Create complete task definition
+ taskDef := &base.TaskDefinition{
+ Type: types.TaskTypeVacuum,
+ Name: "vacuum",
+ DisplayName: "Volume Vacuum",
+ Description: "Reclaims disk space by removing deleted files from volumes",
+ Icon: "fas fa-broom text-primary",
+ Capabilities: []string{"vacuum", "storage"},
+
+ Config: config,
+ ConfigSpec: GetConfigSpec(),
+ CreateTask: CreateTask,
+ DetectionFunc: Detection,
+ ScanInterval: 2 * time.Hour,
+ SchedulingFunc: Scheduling,
+ MaxConcurrent: 2,
+ RepeatInterval: 7 * 24 * time.Hour,
+ }
+
+ // Register everything with a single function call!
+ base.RegisterTask(taskDef)
}