aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/base
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/base')
-rw-r--r--weed/worker/tasks/base/generic_components.go129
-rw-r--r--weed/worker/tasks/base/registration.go155
-rw-r--r--weed/worker/tasks/base/task_definition.go272
-rw-r--r--weed/worker/tasks/base/task_definition_test.go338
-rw-r--r--weed/worker/tasks/base/typed_task.go218
5 files changed, 1112 insertions, 0 deletions
diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go
new file mode 100644
index 000000000..27ad1bb29
--- /dev/null
+++ b/weed/worker/tasks/base/generic_components.go
@@ -0,0 +1,129 @@
+package base
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericDetector implements TaskDetector using function-based logic
+type GenericDetector struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericDetector creates a detector from a task definition
+func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
+ return &GenericDetector{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (d *GenericDetector) GetTaskType() types.TaskType {
+ return d.taskDef.Type
+}
+
+// ScanForTasks scans using the task definition's detection function
+func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
+ if d.taskDef.DetectionFunc == nil {
+ return nil, nil
+ }
+ return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
+}
+
+// ScanInterval returns the scan interval from task definition
+func (d *GenericDetector) ScanInterval() time.Duration {
+ if d.taskDef.ScanInterval > 0 {
+ return d.taskDef.ScanInterval
+ }
+ return 30 * time.Minute // Default
+}
+
+// IsEnabled returns whether this detector is enabled
+func (d *GenericDetector) IsEnabled() bool {
+ return d.taskDef.Config.IsEnabled()
+}
+
+// GenericScheduler implements TaskScheduler using function-based logic
+type GenericScheduler struct {
+ taskDef *TaskDefinition
+}
+
+// NewGenericScheduler creates a scheduler from a task definition
+func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
+ return &GenericScheduler{taskDef: taskDef}
+}
+
+// GetTaskType returns the task type
+func (s *GenericScheduler) GetTaskType() types.TaskType {
+ return s.taskDef.Type
+}
+
+// CanScheduleNow determines if a task can be scheduled using the task definition's function
+func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if s.taskDef.SchedulingFunc == nil {
+ return s.defaultCanSchedule(task, runningTasks, availableWorkers)
+ }
+ return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
+}
+
+// defaultCanSchedule provides default scheduling logic
+func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool {
+ if !s.taskDef.Config.IsEnabled() {
+ return false
+ }
+
+ // Count running tasks of this type
+ runningCount := 0
+ for _, runningTask := range runningTasks {
+ if runningTask.Type == s.taskDef.Type {
+ runningCount++
+ }
+ }
+
+ // Check concurrency limit
+ maxConcurrent := s.taskDef.MaxConcurrent
+ if maxConcurrent <= 0 {
+ maxConcurrent = 1 // Default
+ }
+ if runningCount >= maxConcurrent {
+ return false
+ }
+
+ // Check if we have available workers
+ for _, worker := range availableWorkers {
+ if worker.CurrentLoad < worker.MaxConcurrent {
+ for _, capability := range worker.Capabilities {
+ if capability == s.taskDef.Type {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// GetPriority returns the priority for this task
+func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority {
+ return task.Priority
+}
+
+// GetMaxConcurrent returns max concurrent tasks
+func (s *GenericScheduler) GetMaxConcurrent() int {
+ if s.taskDef.MaxConcurrent > 0 {
+ return s.taskDef.MaxConcurrent
+ }
+ return 1 // Default
+}
+
+// GetDefaultRepeatInterval returns the default repeat interval
+func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
+ if s.taskDef.RepeatInterval > 0 {
+ return s.taskDef.RepeatInterval
+ }
+ return 24 * time.Hour // Default
+}
+
+// IsEnabled returns whether this scheduler is enabled
+func (s *GenericScheduler) IsEnabled() bool {
+ return s.taskDef.Config.IsEnabled()
+}
diff --git a/weed/worker/tasks/base/registration.go b/weed/worker/tasks/base/registration.go
new file mode 100644
index 000000000..416b6f6b8
--- /dev/null
+++ b/weed/worker/tasks/base/registration.go
@@ -0,0 +1,155 @@
+package base
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// GenericFactory creates task instances using a TaskDefinition
+type GenericFactory struct {
+ *tasks.BaseTaskFactory
+ taskDef *TaskDefinition
+}
+
+// NewGenericFactory creates a generic task factory
+func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory {
+ return &GenericFactory{
+ BaseTaskFactory: tasks.NewBaseTaskFactory(
+ taskDef.Type,
+ taskDef.Capabilities,
+ taskDef.Description,
+ ),
+ taskDef: taskDef,
+ }
+}
+
+// Create creates a task instance using the task definition
+func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) {
+ if f.taskDef.CreateTask == nil {
+ return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type)
+ }
+ return f.taskDef.CreateTask(params)
+}
+
+// GenericSchemaProvider provides config schema from TaskDefinition
+type GenericSchemaProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetConfigSchema returns the schema from task definition
+func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema {
+ return &tasks.TaskConfigSchema{
+ TaskName: string(p.taskDef.Type),
+ DisplayName: p.taskDef.DisplayName,
+ Description: p.taskDef.Description,
+ Icon: p.taskDef.Icon,
+ Schema: config.Schema{
+ Fields: p.taskDef.ConfigSpec.Fields,
+ },
+ }
+}
+
+// GenericUIProvider provides UI functionality from TaskDefinition
+type GenericUIProvider struct {
+ taskDef *TaskDefinition
+}
+
+// GetTaskType returns the task type
+func (ui *GenericUIProvider) GetTaskType() types.TaskType {
+ return ui.taskDef.Type
+}
+
+// GetDisplayName returns the human-readable name
+func (ui *GenericUIProvider) GetDisplayName() string {
+ return ui.taskDef.DisplayName
+}
+
+// GetDescription returns a description of what this task does
+func (ui *GenericUIProvider) GetDescription() string {
+ return ui.taskDef.Description
+}
+
+// GetIcon returns the icon CSS class for this task type
+func (ui *GenericUIProvider) GetIcon() string {
+ return ui.taskDef.Icon
+}
+
+// GetCurrentConfig returns current config as TaskConfig
+func (ui *GenericUIProvider) GetCurrentConfig() types.TaskConfig {
+ return ui.taskDef.Config
+}
+
+// ApplyTaskPolicy applies protobuf TaskPolicy configuration
+func (ui *GenericUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ return ui.taskDef.Config.FromTaskPolicy(policy)
+}
+
+// ApplyTaskConfig applies TaskConfig interface configuration
+func (ui *GenericUIProvider) ApplyTaskConfig(config types.TaskConfig) error {
+ taskPolicy := config.ToTaskPolicy()
+ return ui.taskDef.Config.FromTaskPolicy(taskPolicy)
+}
+
+// RegisterTask registers a complete task definition with all registries
+func RegisterTask(taskDef *TaskDefinition) {
+ // Validate task definition
+ if err := validateTaskDefinition(taskDef); err != nil {
+ glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err)
+ return
+ }
+
+ // Create and register factory
+ factory := NewGenericFactory(taskDef)
+ tasks.AutoRegister(taskDef.Type, factory)
+
+ // Create and register detector/scheduler
+ detector := NewGenericDetector(taskDef)
+ scheduler := NewGenericScheduler(taskDef)
+
+ tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) {
+ registry.RegisterTask(detector, scheduler)
+ })
+
+ // Create and register schema provider
+ schemaProvider := &GenericSchemaProvider{taskDef: taskDef}
+ tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider)
+
+ // Create and register UI provider
+ uiProvider := &GenericUIProvider{taskDef: taskDef}
+ tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) {
+ baseUIProvider := tasks.NewBaseUIProvider(
+ taskDef.Type,
+ taskDef.DisplayName,
+ taskDef.Description,
+ taskDef.Icon,
+ schemaProvider.GetConfigSchema,
+ uiProvider.GetCurrentConfig,
+ uiProvider.ApplyTaskPolicy,
+ uiProvider.ApplyTaskConfig,
+ )
+ uiRegistry.RegisterUI(baseUIProvider)
+ })
+
+ glog.V(1).Infof("✅ Registered complete task definition: %s", taskDef.Type)
+}
+
+// validateTaskDefinition ensures the task definition is complete
+func validateTaskDefinition(taskDef *TaskDefinition) error {
+ if taskDef.Type == "" {
+ return fmt.Errorf("task type is required")
+ }
+ if taskDef.Name == "" {
+ return fmt.Errorf("task name is required")
+ }
+ if taskDef.Config == nil {
+ return fmt.Errorf("task config is required")
+ }
+ // CreateTask is optional for tasks that use the typed task system
+ // The typed system registers tasks separately via types.RegisterGlobalTypedTask()
+ return nil
+}
diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go
new file mode 100644
index 000000000..6689d9c81
--- /dev/null
+++ b/weed/worker/tasks/base/task_definition.go
@@ -0,0 +1,272 @@
+package base
+
+import (
+ "fmt"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/admin/config"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// TaskDefinition encapsulates everything needed to define a complete task type
+type TaskDefinition struct {
+ // Basic task information
+ Type types.TaskType
+ Name string
+ DisplayName string
+ Description string
+ Icon string
+ Capabilities []string
+
+ // Task configuration
+ Config TaskConfig
+ ConfigSpec ConfigSpec
+
+ // Task creation
+ CreateTask func(params types.TaskParams) (types.TaskInterface, error)
+
+ // Detection logic
+ DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error)
+ ScanInterval time.Duration
+
+ // Scheduling logic
+ SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool
+ MaxConcurrent int
+ RepeatInterval time.Duration
+}
+
+// TaskConfig provides a configuration interface that supports type-safe defaults
+type TaskConfig interface {
+ config.ConfigWithDefaults // Extends ConfigWithDefaults for type-safe schema operations
+ IsEnabled() bool
+ SetEnabled(bool)
+ ToTaskPolicy() *worker_pb.TaskPolicy
+ FromTaskPolicy(policy *worker_pb.TaskPolicy) error
+}
+
+// ConfigSpec defines the configuration schema
+type ConfigSpec struct {
+ Fields []*config.Field
+}
+
+// BaseConfig provides common configuration fields with reflection-based serialization
+type BaseConfig struct {
+ Enabled bool `json:"enabled"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+}
+
+// IsEnabled returns whether the task is enabled
+func (c *BaseConfig) IsEnabled() bool {
+ return c.Enabled
+}
+
+// SetEnabled sets whether the task is enabled
+func (c *BaseConfig) SetEnabled(enabled bool) {
+ c.Enabled = enabled
+}
+
+// Validate validates the base configuration
+func (c *BaseConfig) Validate() error {
+ // Common validation logic
+ return nil
+}
+
+// StructToMap converts any struct to a map using reflection
+func StructToMap(obj interface{}) map[string]interface{} {
+ result := make(map[string]interface{})
+ val := reflect.ValueOf(obj)
+
+ // Handle pointer to struct
+ if val.Kind() == reflect.Ptr {
+ val = val.Elem()
+ }
+
+ if val.Kind() != reflect.Struct {
+ return result
+ }
+
+ typ := val.Type()
+
+ for i := 0; i < val.NumField(); i++ {
+ field := val.Field(i)
+ fieldType := typ.Field(i)
+
+ // Skip unexported fields
+ if !field.CanInterface() {
+ continue
+ }
+
+ // Handle embedded structs recursively (before JSON tag check)
+ if field.Kind() == reflect.Struct && fieldType.Anonymous {
+ embeddedMap := StructToMap(field.Interface())
+ for k, v := range embeddedMap {
+ result[k] = v
+ }
+ continue
+ }
+
+ // Get JSON tag name
+ jsonTag := fieldType.Tag.Get("json")
+ if jsonTag == "" || jsonTag == "-" {
+ continue
+ }
+
+ // Remove options like ",omitempty"
+ if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
+ jsonTag = jsonTag[:commaIdx]
+ }
+
+ result[jsonTag] = field.Interface()
+ }
+ return result
+}
+
+// MapToStruct loads data from map into struct using reflection
+func MapToStruct(data map[string]interface{}, obj interface{}) error {
+ val := reflect.ValueOf(obj)
+
+ // Must be pointer to struct
+ if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct {
+ return fmt.Errorf("obj must be pointer to struct")
+ }
+
+ val = val.Elem()
+ typ := val.Type()
+
+ for i := 0; i < val.NumField(); i++ {
+ field := val.Field(i)
+ fieldType := typ.Field(i)
+
+ // Skip unexported fields
+ if !field.CanSet() {
+ continue
+ }
+
+ // Handle embedded structs recursively (before JSON tag check)
+ if field.Kind() == reflect.Struct && fieldType.Anonymous {
+ err := MapToStruct(data, field.Addr().Interface())
+ if err != nil {
+ return err
+ }
+ continue
+ }
+
+ // Get JSON tag name
+ jsonTag := fieldType.Tag.Get("json")
+ if jsonTag == "" || jsonTag == "-" {
+ continue
+ }
+
+ // Remove options like ",omitempty"
+ if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 {
+ jsonTag = jsonTag[:commaIdx]
+ }
+
+ if value, exists := data[jsonTag]; exists {
+ err := setFieldValue(field, value)
+ if err != nil {
+ return fmt.Errorf("failed to set field %s: %v", jsonTag, err)
+ }
+ }
+ }
+
+ return nil
+}
+
+// ToMap converts config to map using reflection
+// ToTaskPolicy converts BaseConfig to protobuf (partial implementation)
+// Note: Concrete implementations should override this to include task-specific config
+func (c *BaseConfig) ToTaskPolicy() *worker_pb.TaskPolicy {
+ return &worker_pb.TaskPolicy{
+ Enabled: c.Enabled,
+ MaxConcurrent: int32(c.MaxConcurrent),
+ RepeatIntervalSeconds: int32(c.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(c.ScanIntervalSeconds),
+ // TaskConfig field should be set by concrete implementations
+ }
+}
+
+// FromTaskPolicy loads BaseConfig from protobuf (partial implementation)
+// Note: Concrete implementations should override this to handle task-specific config
+func (c *BaseConfig) FromTaskPolicy(policy *worker_pb.TaskPolicy) error {
+ if policy == nil {
+ return fmt.Errorf("policy is nil")
+ }
+ c.Enabled = policy.Enabled
+ c.MaxConcurrent = int(policy.MaxConcurrent)
+ c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds)
+ return nil
+}
+
+// ApplySchemaDefaults applies default values from schema using reflection
+func (c *BaseConfig) ApplySchemaDefaults(schema *config.Schema) error {
+ // Use reflection-based approach for BaseConfig since it needs to handle embedded structs
+ return schema.ApplyDefaultsToProtobuf(c)
+}
+
+// setFieldValue sets a field value with type conversion
+func setFieldValue(field reflect.Value, value interface{}) error {
+ if value == nil {
+ return nil
+ }
+
+ valueVal := reflect.ValueOf(value)
+ fieldType := field.Type()
+ valueType := valueVal.Type()
+
+ // Direct assignment if types match
+ if valueType.AssignableTo(fieldType) {
+ field.Set(valueVal)
+ return nil
+ }
+
+ // Type conversion for common cases
+ switch fieldType.Kind() {
+ case reflect.Bool:
+ if b, ok := value.(bool); ok {
+ field.SetBool(b)
+ } else {
+ return fmt.Errorf("cannot convert %T to bool", value)
+ }
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+ switch v := value.(type) {
+ case int:
+ field.SetInt(int64(v))
+ case int32:
+ field.SetInt(int64(v))
+ case int64:
+ field.SetInt(v)
+ case float64:
+ field.SetInt(int64(v))
+ default:
+ return fmt.Errorf("cannot convert %T to int", value)
+ }
+ case reflect.Float32, reflect.Float64:
+ switch v := value.(type) {
+ case float32:
+ field.SetFloat(float64(v))
+ case float64:
+ field.SetFloat(v)
+ case int:
+ field.SetFloat(float64(v))
+ case int64:
+ field.SetFloat(float64(v))
+ default:
+ return fmt.Errorf("cannot convert %T to float", value)
+ }
+ case reflect.String:
+ if s, ok := value.(string); ok {
+ field.SetString(s)
+ } else {
+ return fmt.Errorf("cannot convert %T to string", value)
+ }
+ default:
+ return fmt.Errorf("unsupported field type %s", fieldType.Kind())
+ }
+
+ return nil
+}
diff --git a/weed/worker/tasks/base/task_definition_test.go b/weed/worker/tasks/base/task_definition_test.go
new file mode 100644
index 000000000..a0a0a5a24
--- /dev/null
+++ b/weed/worker/tasks/base/task_definition_test.go
@@ -0,0 +1,338 @@
+package base
+
+import (
+ "reflect"
+ "testing"
+)
+
+// Test structs that mirror the actual configuration structure
+type TestBaseConfig struct {
+ Enabled bool `json:"enabled"`
+ ScanIntervalSeconds int `json:"scan_interval_seconds"`
+ MaxConcurrent int `json:"max_concurrent"`
+}
+
+type TestTaskConfig struct {
+ TestBaseConfig
+ TaskSpecificField float64 `json:"task_specific_field"`
+ AnotherSpecificField string `json:"another_specific_field"`
+}
+
+type TestNestedConfig struct {
+ TestBaseConfig
+ NestedStruct struct {
+ NestedField string `json:"nested_field"`
+ } `json:"nested_struct"`
+ TaskField int `json:"task_field"`
+}
+
+func TestStructToMap_WithEmbeddedStruct(t *testing.T) {
+ // Test case 1: Basic embedded struct
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "test_value",
+ }
+
+ result := StructToMap(config)
+
+ // Verify all fields are present
+ expectedFields := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 1800,
+ "max_concurrent": 3,
+ "task_specific_field": 0.25,
+ "another_specific_field": "test_value",
+ }
+
+ if len(result) != len(expectedFields) {
+ t.Errorf("Expected %d fields, got %d. Result: %+v", len(expectedFields), len(result), result)
+ }
+
+ for key, expectedValue := range expectedFields {
+ if actualValue, exists := result[key]; !exists {
+ t.Errorf("Missing field: %s", key)
+ } else if !reflect.DeepEqual(actualValue, expectedValue) {
+ t.Errorf("Field %s: expected %v (%T), got %v (%T)", key, expectedValue, expectedValue, actualValue, actualValue)
+ }
+ }
+}
+
+func TestStructToMap_WithNestedStruct(t *testing.T) {
+ config := &TestNestedConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: false,
+ ScanIntervalSeconds: 3600,
+ MaxConcurrent: 1,
+ },
+ NestedStruct: struct {
+ NestedField string `json:"nested_field"`
+ }{
+ NestedField: "nested_value",
+ },
+ TaskField: 42,
+ }
+
+ result := StructToMap(config)
+
+ // Verify embedded struct fields are included
+ if enabled, exists := result["enabled"]; !exists || enabled != false {
+ t.Errorf("Expected enabled=false from embedded struct, got %v", enabled)
+ }
+
+ if scanInterval, exists := result["scan_interval_seconds"]; !exists || scanInterval != 3600 {
+ t.Errorf("Expected scan_interval_seconds=3600 from embedded struct, got %v", scanInterval)
+ }
+
+ if maxConcurrent, exists := result["max_concurrent"]; !exists || maxConcurrent != 1 {
+ t.Errorf("Expected max_concurrent=1 from embedded struct, got %v", maxConcurrent)
+ }
+
+ // Verify regular fields are included
+ if taskField, exists := result["task_field"]; !exists || taskField != 42 {
+ t.Errorf("Expected task_field=42, got %v", taskField)
+ }
+
+ // Verify nested struct is included as a whole
+ if nestedStruct, exists := result["nested_struct"]; !exists {
+ t.Errorf("Missing nested_struct field")
+ } else {
+ // The nested struct should be included as-is, not flattened
+ if nested, ok := nestedStruct.(struct {
+ NestedField string `json:"nested_field"`
+ }); !ok || nested.NestedField != "nested_value" {
+ t.Errorf("Expected nested_struct with NestedField='nested_value', got %v", nestedStruct)
+ }
+ }
+}
+
+func TestMapToStruct_WithEmbeddedStruct(t *testing.T) {
+ // Test data with all fields including embedded struct fields
+ data := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 2400,
+ "max_concurrent": 5,
+ "task_specific_field": 0.15,
+ "another_specific_field": "updated_value",
+ }
+
+ config := &TestTaskConfig{}
+ err := MapToStruct(data, config)
+
+ if err != nil {
+ t.Fatalf("MapToStruct failed: %v", err)
+ }
+
+ // Verify embedded struct fields were set
+ if config.Enabled != true {
+ t.Errorf("Expected Enabled=true, got %v", config.Enabled)
+ }
+
+ if config.ScanIntervalSeconds != 2400 {
+ t.Errorf("Expected ScanIntervalSeconds=2400, got %v", config.ScanIntervalSeconds)
+ }
+
+ if config.MaxConcurrent != 5 {
+ t.Errorf("Expected MaxConcurrent=5, got %v", config.MaxConcurrent)
+ }
+
+ // Verify regular fields were set
+ if config.TaskSpecificField != 0.15 {
+ t.Errorf("Expected TaskSpecificField=0.15, got %v", config.TaskSpecificField)
+ }
+
+ if config.AnotherSpecificField != "updated_value" {
+ t.Errorf("Expected AnotherSpecificField='updated_value', got %v", config.AnotherSpecificField)
+ }
+}
+
+func TestMapToStruct_PartialData(t *testing.T) {
+ // Test with only some fields present (simulating form data)
+ data := map[string]interface{}{
+ "enabled": false,
+ "max_concurrent": 2,
+ "task_specific_field": 0.30,
+ }
+
+ // Start with some initial values
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 1,
+ },
+ TaskSpecificField: 0.20,
+ AnotherSpecificField: "initial_value",
+ }
+
+ err := MapToStruct(data, config)
+
+ if err != nil {
+ t.Fatalf("MapToStruct failed: %v", err)
+ }
+
+ // Verify updated fields
+ if config.Enabled != false {
+ t.Errorf("Expected Enabled=false (updated), got %v", config.Enabled)
+ }
+
+ if config.MaxConcurrent != 2 {
+ t.Errorf("Expected MaxConcurrent=2 (updated), got %v", config.MaxConcurrent)
+ }
+
+ if config.TaskSpecificField != 0.30 {
+ t.Errorf("Expected TaskSpecificField=0.30 (updated), got %v", config.TaskSpecificField)
+ }
+
+ // Verify unchanged fields remain the same
+ if config.ScanIntervalSeconds != 1800 {
+ t.Errorf("Expected ScanIntervalSeconds=1800 (unchanged), got %v", config.ScanIntervalSeconds)
+ }
+
+ if config.AnotherSpecificField != "initial_value" {
+ t.Errorf("Expected AnotherSpecificField='initial_value' (unchanged), got %v", config.AnotherSpecificField)
+ }
+}
+
+func TestRoundTripSerialization(t *testing.T) {
+ // Test complete round-trip: struct -> map -> struct
+ original := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 3600,
+ MaxConcurrent: 4,
+ },
+ TaskSpecificField: 0.18,
+ AnotherSpecificField: "round_trip_test",
+ }
+
+ // Convert to map
+ dataMap := StructToMap(original)
+
+ // Convert back to struct
+ roundTrip := &TestTaskConfig{}
+ err := MapToStruct(dataMap, roundTrip)
+
+ if err != nil {
+ t.Fatalf("Round-trip MapToStruct failed: %v", err)
+ }
+
+ // Verify all fields match
+ if !reflect.DeepEqual(original.TestBaseConfig, roundTrip.TestBaseConfig) {
+ t.Errorf("BaseConfig mismatch:\nOriginal: %+v\nRound-trip: %+v", original.TestBaseConfig, roundTrip.TestBaseConfig)
+ }
+
+ if original.TaskSpecificField != roundTrip.TaskSpecificField {
+ t.Errorf("TaskSpecificField mismatch: %v != %v", original.TaskSpecificField, roundTrip.TaskSpecificField)
+ }
+
+ if original.AnotherSpecificField != roundTrip.AnotherSpecificField {
+ t.Errorf("AnotherSpecificField mismatch: %v != %v", original.AnotherSpecificField, roundTrip.AnotherSpecificField)
+ }
+}
+
+func TestStructToMap_EmptyStruct(t *testing.T) {
+ config := &TestTaskConfig{}
+ result := StructToMap(config)
+
+ // Should still include all fields, even with zero values
+ expectedFields := []string{"enabled", "scan_interval_seconds", "max_concurrent", "task_specific_field", "another_specific_field"}
+
+ for _, field := range expectedFields {
+ if _, exists := result[field]; !exists {
+ t.Errorf("Missing field: %s", field)
+ }
+ }
+}
+
+func TestStructToMap_NilPointer(t *testing.T) {
+ var config *TestTaskConfig = nil
+ result := StructToMap(config)
+
+ if len(result) != 0 {
+ t.Errorf("Expected empty map for nil pointer, got %+v", result)
+ }
+}
+
+func TestMapToStruct_InvalidInput(t *testing.T) {
+ data := map[string]interface{}{
+ "enabled": "not_a_bool", // Wrong type
+ }
+
+ config := &TestTaskConfig{}
+ err := MapToStruct(data, config)
+
+ if err == nil {
+ t.Errorf("Expected error for invalid input type, but got none")
+ }
+}
+
+func TestMapToStruct_NonPointer(t *testing.T) {
+ data := map[string]interface{}{
+ "enabled": true,
+ }
+
+ config := TestTaskConfig{} // Not a pointer
+ err := MapToStruct(data, config)
+
+ if err == nil {
+ t.Errorf("Expected error for non-pointer input, but got none")
+ }
+}
+
+// Benchmark tests to ensure performance is reasonable
+func BenchmarkStructToMap(b *testing.B) {
+ config := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _ = StructToMap(config)
+ }
+}
+
+func BenchmarkMapToStruct(b *testing.B) {
+ data := map[string]interface{}{
+ "enabled": true,
+ "scan_interval_seconds": 1800,
+ "max_concurrent": 3,
+ "task_specific_field": 0.25,
+ "another_specific_field": "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ config := &TestTaskConfig{}
+ _ = MapToStruct(data, config)
+ }
+}
+
+func BenchmarkRoundTrip(b *testing.B) {
+ original := &TestTaskConfig{
+ TestBaseConfig: TestBaseConfig{
+ Enabled: true,
+ ScanIntervalSeconds: 1800,
+ MaxConcurrent: 3,
+ },
+ TaskSpecificField: 0.25,
+ AnotherSpecificField: "benchmark_test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ dataMap := StructToMap(original)
+ roundTrip := &TestTaskConfig{}
+ _ = MapToStruct(dataMap, roundTrip)
+ }
+}
diff --git a/weed/worker/tasks/base/typed_task.go b/weed/worker/tasks/base/typed_task.go
new file mode 100644
index 000000000..9d2839607
--- /dev/null
+++ b/weed/worker/tasks/base/typed_task.go
@@ -0,0 +1,218 @@
+package base
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// BaseTypedTask provides a base implementation for typed tasks with logger support
+type BaseTypedTask struct {
+ taskType types.TaskType
+ taskID string
+ progress float64
+ progressCallback func(float64)
+ cancelled bool
+ mutex sync.RWMutex
+
+ // Logger functionality
+ logger tasks.TaskLogger
+ loggerConfig types.TaskLoggerConfig
+}
+
+// NewBaseTypedTask creates a new base typed task
+func NewBaseTypedTask(taskType types.TaskType) *BaseTypedTask {
+ return &BaseTypedTask{
+ taskType: taskType,
+ progress: 0.0,
+ loggerConfig: types.TaskLoggerConfig{
+ BaseLogDir: "/data/task_logs",
+ MaxTasks: 100,
+ MaxLogSizeMB: 10,
+ EnableConsole: true,
+ },
+ }
+}
+
+// GetType returns the task type
+func (bt *BaseTypedTask) GetType() types.TaskType {
+ return bt.taskType
+}
+
+// IsCancellable returns whether the task can be cancelled
+func (bt *BaseTypedTask) IsCancellable() bool {
+ return true // Most tasks can be cancelled
+}
+
+// Cancel cancels the task
+func (bt *BaseTypedTask) Cancel() error {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.cancelled = true
+ return nil
+}
+
+// IsCancelled returns whether the task has been cancelled
+func (bt *BaseTypedTask) IsCancelled() bool {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.cancelled
+}
+
+// GetProgress returns the current progress (0-100)
+func (bt *BaseTypedTask) GetProgress() float64 {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.progress
+}
+
+// SetProgress sets the current progress and calls the callback if set
+func (bt *BaseTypedTask) SetProgress(progress float64) {
+ bt.mutex.Lock()
+ callback := bt.progressCallback
+ bt.progress = progress
+ bt.mutex.Unlock()
+
+ if callback != nil {
+ callback(progress)
+ }
+}
+
+// SetProgressCallback sets the progress callback function
+func (bt *BaseTypedTask) SetProgressCallback(callback func(float64)) {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.progressCallback = callback
+}
+
+// SetLoggerConfig sets the logger configuration for this task
+func (bt *BaseTypedTask) SetLoggerConfig(config types.TaskLoggerConfig) {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+ bt.loggerConfig = config
+}
+
+// convertToTasksLoggerConfig converts types.TaskLoggerConfig to tasks.TaskLoggerConfig
+func convertToTasksLoggerConfig(config types.TaskLoggerConfig) tasks.TaskLoggerConfig {
+ return tasks.TaskLoggerConfig{
+ BaseLogDir: config.BaseLogDir,
+ MaxTasks: config.MaxTasks,
+ MaxLogSizeMB: config.MaxLogSizeMB,
+ EnableConsole: config.EnableConsole,
+ }
+}
+
+// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface)
+func (bt *BaseTypedTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error {
+ bt.mutex.Lock()
+ defer bt.mutex.Unlock()
+
+ bt.taskID = taskID
+
+ // Convert the logger config to the tasks package type
+ tasksLoggerConfig := convertToTasksLoggerConfig(bt.loggerConfig)
+
+ logger, err := tasks.NewTaskLogger(taskID, bt.taskType, workerID, params, tasksLoggerConfig)
+ if err != nil {
+ return fmt.Errorf("failed to initialize task logger: %w", err)
+ }
+
+ bt.logger = logger
+ if bt.logger != nil {
+ bt.logger.Info("BaseTypedTask initialized for task %s (type: %s)", taskID, bt.taskType)
+ }
+
+ return nil
+}
+
+// GetTaskLogger returns the task logger (LoggerProvider interface)
+func (bt *BaseTypedTask) GetTaskLogger() types.TaskLogger {
+ bt.mutex.RLock()
+ defer bt.mutex.RUnlock()
+ return bt.logger
+}
+
+// LogInfo logs an info message
+func (bt *BaseTypedTask) LogInfo(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Info(message, args...)
+ }
+}
+
+// LogWarning logs a warning message
+func (bt *BaseTypedTask) LogWarning(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Warning(message, args...)
+ }
+}
+
+// LogError logs an error message
+func (bt *BaseTypedTask) LogError(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Error(message, args...)
+ }
+}
+
+// LogDebug logs a debug message
+func (bt *BaseTypedTask) LogDebug(message string, args ...interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.Debug(message, args...)
+ }
+}
+
+// LogWithFields logs a message with structured fields
+func (bt *BaseTypedTask) LogWithFields(level string, message string, fields map[string]interface{}) {
+ bt.mutex.RLock()
+ logger := bt.logger
+ bt.mutex.RUnlock()
+
+ if logger != nil {
+ logger.LogWithFields(level, message, fields)
+ }
+}
+
+// ValidateTyped provides basic validation for typed parameters
+func (bt *BaseTypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
+ if params == nil {
+ return errors.New("task parameters cannot be nil")
+ }
+ if params.VolumeId == 0 {
+ return errors.New("volume_id is required")
+ }
+ if params.Server == "" {
+ return errors.New("server is required")
+ }
+ return nil
+}
+
+// EstimateTimeTyped provides a default time estimation
+func (bt *BaseTypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
+ // Default estimation - concrete tasks should override this
+ return 5 * time.Minute
+}
+
+// ExecuteTyped is a placeholder that concrete tasks must implement
+func (bt *BaseTypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
+ panic("ExecuteTyped must be implemented by concrete task types")
+}