diff options
Diffstat (limited to 'weed/worker/tasks/base')
| -rw-r--r-- | weed/worker/tasks/base/generic_components.go | 129 | ||||
| -rw-r--r-- | weed/worker/tasks/base/registration.go | 155 | ||||
| -rw-r--r-- | weed/worker/tasks/base/task_definition.go | 272 | ||||
| -rw-r--r-- | weed/worker/tasks/base/task_definition_test.go | 338 | ||||
| -rw-r--r-- | weed/worker/tasks/base/typed_task.go | 218 |
5 files changed, 1112 insertions, 0 deletions
diff --git a/weed/worker/tasks/base/generic_components.go b/weed/worker/tasks/base/generic_components.go new file mode 100644 index 000000000..27ad1bb29 --- /dev/null +++ b/weed/worker/tasks/base/generic_components.go @@ -0,0 +1,129 @@ +package base + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// GenericDetector implements TaskDetector using function-based logic +type GenericDetector struct { + taskDef *TaskDefinition +} + +// NewGenericDetector creates a detector from a task definition +func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector { + return &GenericDetector{taskDef: taskDef} +} + +// GetTaskType returns the task type +func (d *GenericDetector) GetTaskType() types.TaskType { + return d.taskDef.Type +} + +// ScanForTasks scans using the task definition's detection function +func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) { + if d.taskDef.DetectionFunc == nil { + return nil, nil + } + return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config) +} + +// ScanInterval returns the scan interval from task definition +func (d *GenericDetector) ScanInterval() time.Duration { + if d.taskDef.ScanInterval > 0 { + return d.taskDef.ScanInterval + } + return 30 * time.Minute // Default +} + +// IsEnabled returns whether this detector is enabled +func (d *GenericDetector) IsEnabled() bool { + return d.taskDef.Config.IsEnabled() +} + +// GenericScheduler implements TaskScheduler using function-based logic +type GenericScheduler struct { + taskDef *TaskDefinition +} + +// NewGenericScheduler creates a scheduler from a task definition +func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler { + return &GenericScheduler{taskDef: taskDef} +} + +// GetTaskType returns the task type +func (s *GenericScheduler) GetTaskType() types.TaskType { + return s.taskDef.Type +} + +// CanScheduleNow determines if a task can be scheduled using the task definition's function +func (s *GenericScheduler) CanScheduleNow(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { + if s.taskDef.SchedulingFunc == nil { + return s.defaultCanSchedule(task, runningTasks, availableWorkers) + } + return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config) +} + +// defaultCanSchedule provides default scheduling logic +func (s *GenericScheduler) defaultCanSchedule(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker) bool { + if !s.taskDef.Config.IsEnabled() { + return false + } + + // Count running tasks of this type + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == s.taskDef.Type { + runningCount++ + } + } + + // Check concurrency limit + maxConcurrent := s.taskDef.MaxConcurrent + if maxConcurrent <= 0 { + maxConcurrent = 1 // Default + } + if runningCount >= maxConcurrent { + return false + } + + // Check if we have available workers + for _, worker := range availableWorkers { + if worker.CurrentLoad < worker.MaxConcurrent { + for _, capability := range worker.Capabilities { + if capability == s.taskDef.Type { + return true + } + } + } + } + + return false +} + +// GetPriority returns the priority for this task +func (s *GenericScheduler) GetPriority(task *types.Task) types.TaskPriority { + return task.Priority +} + +// GetMaxConcurrent returns max concurrent tasks +func (s *GenericScheduler) GetMaxConcurrent() int { + if s.taskDef.MaxConcurrent > 0 { + return s.taskDef.MaxConcurrent + } + return 1 // Default +} + +// GetDefaultRepeatInterval returns the default repeat interval +func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration { + if s.taskDef.RepeatInterval > 0 { + return s.taskDef.RepeatInterval + } + return 24 * time.Hour // Default +} + +// IsEnabled returns whether this scheduler is enabled +func (s *GenericScheduler) IsEnabled() bool { + return s.taskDef.Config.IsEnabled() +} diff --git a/weed/worker/tasks/base/registration.go b/weed/worker/tasks/base/registration.go new file mode 100644 index 000000000..416b6f6b8 --- /dev/null +++ b/weed/worker/tasks/base/registration.go @@ -0,0 +1,155 @@ +package base + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// GenericFactory creates task instances using a TaskDefinition +type GenericFactory struct { + *tasks.BaseTaskFactory + taskDef *TaskDefinition +} + +// NewGenericFactory creates a generic task factory +func NewGenericFactory(taskDef *TaskDefinition) *GenericFactory { + return &GenericFactory{ + BaseTaskFactory: tasks.NewBaseTaskFactory( + taskDef.Type, + taskDef.Capabilities, + taskDef.Description, + ), + taskDef: taskDef, + } +} + +// Create creates a task instance using the task definition +func (f *GenericFactory) Create(params types.TaskParams) (types.TaskInterface, error) { + if f.taskDef.CreateTask == nil { + return nil, fmt.Errorf("no task creation function defined for %s", f.taskDef.Type) + } + return f.taskDef.CreateTask(params) +} + +// GenericSchemaProvider provides config schema from TaskDefinition +type GenericSchemaProvider struct { + taskDef *TaskDefinition +} + +// GetConfigSchema returns the schema from task definition +func (p *GenericSchemaProvider) GetConfigSchema() *tasks.TaskConfigSchema { + return &tasks.TaskConfigSchema{ + TaskName: string(p.taskDef.Type), + DisplayName: p.taskDef.DisplayName, + Description: p.taskDef.Description, + Icon: p.taskDef.Icon, + Schema: config.Schema{ + Fields: p.taskDef.ConfigSpec.Fields, + }, + } +} + +// GenericUIProvider provides UI functionality from TaskDefinition +type GenericUIProvider struct { + taskDef *TaskDefinition +} + +// GetTaskType returns the task type +func (ui *GenericUIProvider) GetTaskType() types.TaskType { + return ui.taskDef.Type +} + +// GetDisplayName returns the human-readable name +func (ui *GenericUIProvider) GetDisplayName() string { + return ui.taskDef.DisplayName +} + +// GetDescription returns a description of what this task does +func (ui *GenericUIProvider) GetDescription() string { + return ui.taskDef.Description +} + +// GetIcon returns the icon CSS class for this task type +func (ui *GenericUIProvider) GetIcon() string { + return ui.taskDef.Icon +} + +// GetCurrentConfig returns current config as TaskConfig +func (ui *GenericUIProvider) GetCurrentConfig() types.TaskConfig { + return ui.taskDef.Config +} + +// ApplyTaskPolicy applies protobuf TaskPolicy configuration +func (ui *GenericUIProvider) ApplyTaskPolicy(policy *worker_pb.TaskPolicy) error { + return ui.taskDef.Config.FromTaskPolicy(policy) +} + +// ApplyTaskConfig applies TaskConfig interface configuration +func (ui *GenericUIProvider) ApplyTaskConfig(config types.TaskConfig) error { + taskPolicy := config.ToTaskPolicy() + return ui.taskDef.Config.FromTaskPolicy(taskPolicy) +} + +// RegisterTask registers a complete task definition with all registries +func RegisterTask(taskDef *TaskDefinition) { + // Validate task definition + if err := validateTaskDefinition(taskDef); err != nil { + glog.Errorf("Invalid task definition for %s: %v", taskDef.Type, err) + return + } + + // Create and register factory + factory := NewGenericFactory(taskDef) + tasks.AutoRegister(taskDef.Type, factory) + + // Create and register detector/scheduler + detector := NewGenericDetector(taskDef) + scheduler := NewGenericScheduler(taskDef) + + tasks.AutoRegisterTypes(func(registry *types.TaskRegistry) { + registry.RegisterTask(detector, scheduler) + }) + + // Create and register schema provider + schemaProvider := &GenericSchemaProvider{taskDef: taskDef} + tasks.RegisterTaskConfigSchema(string(taskDef.Type), schemaProvider) + + // Create and register UI provider + uiProvider := &GenericUIProvider{taskDef: taskDef} + tasks.AutoRegisterUI(func(uiRegistry *types.UIRegistry) { + baseUIProvider := tasks.NewBaseUIProvider( + taskDef.Type, + taskDef.DisplayName, + taskDef.Description, + taskDef.Icon, + schemaProvider.GetConfigSchema, + uiProvider.GetCurrentConfig, + uiProvider.ApplyTaskPolicy, + uiProvider.ApplyTaskConfig, + ) + uiRegistry.RegisterUI(baseUIProvider) + }) + + glog.V(1).Infof("✅ Registered complete task definition: %s", taskDef.Type) +} + +// validateTaskDefinition ensures the task definition is complete +func validateTaskDefinition(taskDef *TaskDefinition) error { + if taskDef.Type == "" { + return fmt.Errorf("task type is required") + } + if taskDef.Name == "" { + return fmt.Errorf("task name is required") + } + if taskDef.Config == nil { + return fmt.Errorf("task config is required") + } + // CreateTask is optional for tasks that use the typed task system + // The typed system registers tasks separately via types.RegisterGlobalTypedTask() + return nil +} diff --git a/weed/worker/tasks/base/task_definition.go b/weed/worker/tasks/base/task_definition.go new file mode 100644 index 000000000..6689d9c81 --- /dev/null +++ b/weed/worker/tasks/base/task_definition.go @@ -0,0 +1,272 @@ +package base + +import ( + "fmt" + "reflect" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// TaskDefinition encapsulates everything needed to define a complete task type +type TaskDefinition struct { + // Basic task information + Type types.TaskType + Name string + DisplayName string + Description string + Icon string + Capabilities []string + + // Task configuration + Config TaskConfig + ConfigSpec ConfigSpec + + // Task creation + CreateTask func(params types.TaskParams) (types.TaskInterface, error) + + // Detection logic + DetectionFunc func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config TaskConfig) ([]*types.TaskDetectionResult, error) + ScanInterval time.Duration + + // Scheduling logic + SchedulingFunc func(task *types.Task, running []*types.Task, workers []*types.Worker, config TaskConfig) bool + MaxConcurrent int + RepeatInterval time.Duration +} + +// TaskConfig provides a configuration interface that supports type-safe defaults +type TaskConfig interface { + config.ConfigWithDefaults // Extends ConfigWithDefaults for type-safe schema operations + IsEnabled() bool + SetEnabled(bool) + ToTaskPolicy() *worker_pb.TaskPolicy + FromTaskPolicy(policy *worker_pb.TaskPolicy) error +} + +// ConfigSpec defines the configuration schema +type ConfigSpec struct { + Fields []*config.Field +} + +// BaseConfig provides common configuration fields with reflection-based serialization +type BaseConfig struct { + Enabled bool `json:"enabled"` + ScanIntervalSeconds int `json:"scan_interval_seconds"` + MaxConcurrent int `json:"max_concurrent"` +} + +// IsEnabled returns whether the task is enabled +func (c *BaseConfig) IsEnabled() bool { + return c.Enabled +} + +// SetEnabled sets whether the task is enabled +func (c *BaseConfig) SetEnabled(enabled bool) { + c.Enabled = enabled +} + +// Validate validates the base configuration +func (c *BaseConfig) Validate() error { + // Common validation logic + return nil +} + +// StructToMap converts any struct to a map using reflection +func StructToMap(obj interface{}) map[string]interface{} { + result := make(map[string]interface{}) + val := reflect.ValueOf(obj) + + // Handle pointer to struct + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + + if val.Kind() != reflect.Struct { + return result + } + + typ := val.Type() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Skip unexported fields + if !field.CanInterface() { + continue + } + + // Handle embedded structs recursively (before JSON tag check) + if field.Kind() == reflect.Struct && fieldType.Anonymous { + embeddedMap := StructToMap(field.Interface()) + for k, v := range embeddedMap { + result[k] = v + } + continue + } + + // Get JSON tag name + jsonTag := fieldType.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + continue + } + + // Remove options like ",omitempty" + if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 { + jsonTag = jsonTag[:commaIdx] + } + + result[jsonTag] = field.Interface() + } + return result +} + +// MapToStruct loads data from map into struct using reflection +func MapToStruct(data map[string]interface{}, obj interface{}) error { + val := reflect.ValueOf(obj) + + // Must be pointer to struct + if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct { + return fmt.Errorf("obj must be pointer to struct") + } + + val = val.Elem() + typ := val.Type() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + fieldType := typ.Field(i) + + // Skip unexported fields + if !field.CanSet() { + continue + } + + // Handle embedded structs recursively (before JSON tag check) + if field.Kind() == reflect.Struct && fieldType.Anonymous { + err := MapToStruct(data, field.Addr().Interface()) + if err != nil { + return err + } + continue + } + + // Get JSON tag name + jsonTag := fieldType.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + continue + } + + // Remove options like ",omitempty" + if commaIdx := strings.Index(jsonTag, ","); commaIdx >= 0 { + jsonTag = jsonTag[:commaIdx] + } + + if value, exists := data[jsonTag]; exists { + err := setFieldValue(field, value) + if err != nil { + return fmt.Errorf("failed to set field %s: %v", jsonTag, err) + } + } + } + + return nil +} + +// ToMap converts config to map using reflection +// ToTaskPolicy converts BaseConfig to protobuf (partial implementation) +// Note: Concrete implementations should override this to include task-specific config +func (c *BaseConfig) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + // TaskConfig field should be set by concrete implementations + } +} + +// FromTaskPolicy loads BaseConfig from protobuf (partial implementation) +// Note: Concrete implementations should override this to handle task-specific config +func (c *BaseConfig) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) + return nil +} + +// ApplySchemaDefaults applies default values from schema using reflection +func (c *BaseConfig) ApplySchemaDefaults(schema *config.Schema) error { + // Use reflection-based approach for BaseConfig since it needs to handle embedded structs + return schema.ApplyDefaultsToProtobuf(c) +} + +// setFieldValue sets a field value with type conversion +func setFieldValue(field reflect.Value, value interface{}) error { + if value == nil { + return nil + } + + valueVal := reflect.ValueOf(value) + fieldType := field.Type() + valueType := valueVal.Type() + + // Direct assignment if types match + if valueType.AssignableTo(fieldType) { + field.Set(valueVal) + return nil + } + + // Type conversion for common cases + switch fieldType.Kind() { + case reflect.Bool: + if b, ok := value.(bool); ok { + field.SetBool(b) + } else { + return fmt.Errorf("cannot convert %T to bool", value) + } + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + switch v := value.(type) { + case int: + field.SetInt(int64(v)) + case int32: + field.SetInt(int64(v)) + case int64: + field.SetInt(v) + case float64: + field.SetInt(int64(v)) + default: + return fmt.Errorf("cannot convert %T to int", value) + } + case reflect.Float32, reflect.Float64: + switch v := value.(type) { + case float32: + field.SetFloat(float64(v)) + case float64: + field.SetFloat(v) + case int: + field.SetFloat(float64(v)) + case int64: + field.SetFloat(float64(v)) + default: + return fmt.Errorf("cannot convert %T to float", value) + } + case reflect.String: + if s, ok := value.(string); ok { + field.SetString(s) + } else { + return fmt.Errorf("cannot convert %T to string", value) + } + default: + return fmt.Errorf("unsupported field type %s", fieldType.Kind()) + } + + return nil +} diff --git a/weed/worker/tasks/base/task_definition_test.go b/weed/worker/tasks/base/task_definition_test.go new file mode 100644 index 000000000..a0a0a5a24 --- /dev/null +++ b/weed/worker/tasks/base/task_definition_test.go @@ -0,0 +1,338 @@ +package base + +import ( + "reflect" + "testing" +) + +// Test structs that mirror the actual configuration structure +type TestBaseConfig struct { + Enabled bool `json:"enabled"` + ScanIntervalSeconds int `json:"scan_interval_seconds"` + MaxConcurrent int `json:"max_concurrent"` +} + +type TestTaskConfig struct { + TestBaseConfig + TaskSpecificField float64 `json:"task_specific_field"` + AnotherSpecificField string `json:"another_specific_field"` +} + +type TestNestedConfig struct { + TestBaseConfig + NestedStruct struct { + NestedField string `json:"nested_field"` + } `json:"nested_struct"` + TaskField int `json:"task_field"` +} + +func TestStructToMap_WithEmbeddedStruct(t *testing.T) { + // Test case 1: Basic embedded struct + config := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 3, + }, + TaskSpecificField: 0.25, + AnotherSpecificField: "test_value", + } + + result := StructToMap(config) + + // Verify all fields are present + expectedFields := map[string]interface{}{ + "enabled": true, + "scan_interval_seconds": 1800, + "max_concurrent": 3, + "task_specific_field": 0.25, + "another_specific_field": "test_value", + } + + if len(result) != len(expectedFields) { + t.Errorf("Expected %d fields, got %d. Result: %+v", len(expectedFields), len(result), result) + } + + for key, expectedValue := range expectedFields { + if actualValue, exists := result[key]; !exists { + t.Errorf("Missing field: %s", key) + } else if !reflect.DeepEqual(actualValue, expectedValue) { + t.Errorf("Field %s: expected %v (%T), got %v (%T)", key, expectedValue, expectedValue, actualValue, actualValue) + } + } +} + +func TestStructToMap_WithNestedStruct(t *testing.T) { + config := &TestNestedConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: false, + ScanIntervalSeconds: 3600, + MaxConcurrent: 1, + }, + NestedStruct: struct { + NestedField string `json:"nested_field"` + }{ + NestedField: "nested_value", + }, + TaskField: 42, + } + + result := StructToMap(config) + + // Verify embedded struct fields are included + if enabled, exists := result["enabled"]; !exists || enabled != false { + t.Errorf("Expected enabled=false from embedded struct, got %v", enabled) + } + + if scanInterval, exists := result["scan_interval_seconds"]; !exists || scanInterval != 3600 { + t.Errorf("Expected scan_interval_seconds=3600 from embedded struct, got %v", scanInterval) + } + + if maxConcurrent, exists := result["max_concurrent"]; !exists || maxConcurrent != 1 { + t.Errorf("Expected max_concurrent=1 from embedded struct, got %v", maxConcurrent) + } + + // Verify regular fields are included + if taskField, exists := result["task_field"]; !exists || taskField != 42 { + t.Errorf("Expected task_field=42, got %v", taskField) + } + + // Verify nested struct is included as a whole + if nestedStruct, exists := result["nested_struct"]; !exists { + t.Errorf("Missing nested_struct field") + } else { + // The nested struct should be included as-is, not flattened + if nested, ok := nestedStruct.(struct { + NestedField string `json:"nested_field"` + }); !ok || nested.NestedField != "nested_value" { + t.Errorf("Expected nested_struct with NestedField='nested_value', got %v", nestedStruct) + } + } +} + +func TestMapToStruct_WithEmbeddedStruct(t *testing.T) { + // Test data with all fields including embedded struct fields + data := map[string]interface{}{ + "enabled": true, + "scan_interval_seconds": 2400, + "max_concurrent": 5, + "task_specific_field": 0.15, + "another_specific_field": "updated_value", + } + + config := &TestTaskConfig{} + err := MapToStruct(data, config) + + if err != nil { + t.Fatalf("MapToStruct failed: %v", err) + } + + // Verify embedded struct fields were set + if config.Enabled != true { + t.Errorf("Expected Enabled=true, got %v", config.Enabled) + } + + if config.ScanIntervalSeconds != 2400 { + t.Errorf("Expected ScanIntervalSeconds=2400, got %v", config.ScanIntervalSeconds) + } + + if config.MaxConcurrent != 5 { + t.Errorf("Expected MaxConcurrent=5, got %v", config.MaxConcurrent) + } + + // Verify regular fields were set + if config.TaskSpecificField != 0.15 { + t.Errorf("Expected TaskSpecificField=0.15, got %v", config.TaskSpecificField) + } + + if config.AnotherSpecificField != "updated_value" { + t.Errorf("Expected AnotherSpecificField='updated_value', got %v", config.AnotherSpecificField) + } +} + +func TestMapToStruct_PartialData(t *testing.T) { + // Test with only some fields present (simulating form data) + data := map[string]interface{}{ + "enabled": false, + "max_concurrent": 2, + "task_specific_field": 0.30, + } + + // Start with some initial values + config := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 1, + }, + TaskSpecificField: 0.20, + AnotherSpecificField: "initial_value", + } + + err := MapToStruct(data, config) + + if err != nil { + t.Fatalf("MapToStruct failed: %v", err) + } + + // Verify updated fields + if config.Enabled != false { + t.Errorf("Expected Enabled=false (updated), got %v", config.Enabled) + } + + if config.MaxConcurrent != 2 { + t.Errorf("Expected MaxConcurrent=2 (updated), got %v", config.MaxConcurrent) + } + + if config.TaskSpecificField != 0.30 { + t.Errorf("Expected TaskSpecificField=0.30 (updated), got %v", config.TaskSpecificField) + } + + // Verify unchanged fields remain the same + if config.ScanIntervalSeconds != 1800 { + t.Errorf("Expected ScanIntervalSeconds=1800 (unchanged), got %v", config.ScanIntervalSeconds) + } + + if config.AnotherSpecificField != "initial_value" { + t.Errorf("Expected AnotherSpecificField='initial_value' (unchanged), got %v", config.AnotherSpecificField) + } +} + +func TestRoundTripSerialization(t *testing.T) { + // Test complete round-trip: struct -> map -> struct + original := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 3600, + MaxConcurrent: 4, + }, + TaskSpecificField: 0.18, + AnotherSpecificField: "round_trip_test", + } + + // Convert to map + dataMap := StructToMap(original) + + // Convert back to struct + roundTrip := &TestTaskConfig{} + err := MapToStruct(dataMap, roundTrip) + + if err != nil { + t.Fatalf("Round-trip MapToStruct failed: %v", err) + } + + // Verify all fields match + if !reflect.DeepEqual(original.TestBaseConfig, roundTrip.TestBaseConfig) { + t.Errorf("BaseConfig mismatch:\nOriginal: %+v\nRound-trip: %+v", original.TestBaseConfig, roundTrip.TestBaseConfig) + } + + if original.TaskSpecificField != roundTrip.TaskSpecificField { + t.Errorf("TaskSpecificField mismatch: %v != %v", original.TaskSpecificField, roundTrip.TaskSpecificField) + } + + if original.AnotherSpecificField != roundTrip.AnotherSpecificField { + t.Errorf("AnotherSpecificField mismatch: %v != %v", original.AnotherSpecificField, roundTrip.AnotherSpecificField) + } +} + +func TestStructToMap_EmptyStruct(t *testing.T) { + config := &TestTaskConfig{} + result := StructToMap(config) + + // Should still include all fields, even with zero values + expectedFields := []string{"enabled", "scan_interval_seconds", "max_concurrent", "task_specific_field", "another_specific_field"} + + for _, field := range expectedFields { + if _, exists := result[field]; !exists { + t.Errorf("Missing field: %s", field) + } + } +} + +func TestStructToMap_NilPointer(t *testing.T) { + var config *TestTaskConfig = nil + result := StructToMap(config) + + if len(result) != 0 { + t.Errorf("Expected empty map for nil pointer, got %+v", result) + } +} + +func TestMapToStruct_InvalidInput(t *testing.T) { + data := map[string]interface{}{ + "enabled": "not_a_bool", // Wrong type + } + + config := &TestTaskConfig{} + err := MapToStruct(data, config) + + if err == nil { + t.Errorf("Expected error for invalid input type, but got none") + } +} + +func TestMapToStruct_NonPointer(t *testing.T) { + data := map[string]interface{}{ + "enabled": true, + } + + config := TestTaskConfig{} // Not a pointer + err := MapToStruct(data, config) + + if err == nil { + t.Errorf("Expected error for non-pointer input, but got none") + } +} + +// Benchmark tests to ensure performance is reasonable +func BenchmarkStructToMap(b *testing.B) { + config := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 3, + }, + TaskSpecificField: 0.25, + AnotherSpecificField: "benchmark_test", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = StructToMap(config) + } +} + +func BenchmarkMapToStruct(b *testing.B) { + data := map[string]interface{}{ + "enabled": true, + "scan_interval_seconds": 1800, + "max_concurrent": 3, + "task_specific_field": 0.25, + "another_specific_field": "benchmark_test", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + config := &TestTaskConfig{} + _ = MapToStruct(data, config) + } +} + +func BenchmarkRoundTrip(b *testing.B) { + original := &TestTaskConfig{ + TestBaseConfig: TestBaseConfig{ + Enabled: true, + ScanIntervalSeconds: 1800, + MaxConcurrent: 3, + }, + TaskSpecificField: 0.25, + AnotherSpecificField: "benchmark_test", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + dataMap := StructToMap(original) + roundTrip := &TestTaskConfig{} + _ = MapToStruct(dataMap, roundTrip) + } +} diff --git a/weed/worker/tasks/base/typed_task.go b/weed/worker/tasks/base/typed_task.go new file mode 100644 index 000000000..9d2839607 --- /dev/null +++ b/weed/worker/tasks/base/typed_task.go @@ -0,0 +1,218 @@ +package base + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// BaseTypedTask provides a base implementation for typed tasks with logger support +type BaseTypedTask struct { + taskType types.TaskType + taskID string + progress float64 + progressCallback func(float64) + cancelled bool + mutex sync.RWMutex + + // Logger functionality + logger tasks.TaskLogger + loggerConfig types.TaskLoggerConfig +} + +// NewBaseTypedTask creates a new base typed task +func NewBaseTypedTask(taskType types.TaskType) *BaseTypedTask { + return &BaseTypedTask{ + taskType: taskType, + progress: 0.0, + loggerConfig: types.TaskLoggerConfig{ + BaseLogDir: "/data/task_logs", + MaxTasks: 100, + MaxLogSizeMB: 10, + EnableConsole: true, + }, + } +} + +// GetType returns the task type +func (bt *BaseTypedTask) GetType() types.TaskType { + return bt.taskType +} + +// IsCancellable returns whether the task can be cancelled +func (bt *BaseTypedTask) IsCancellable() bool { + return true // Most tasks can be cancelled +} + +// Cancel cancels the task +func (bt *BaseTypedTask) Cancel() error { + bt.mutex.Lock() + defer bt.mutex.Unlock() + bt.cancelled = true + return nil +} + +// IsCancelled returns whether the task has been cancelled +func (bt *BaseTypedTask) IsCancelled() bool { + bt.mutex.RLock() + defer bt.mutex.RUnlock() + return bt.cancelled +} + +// GetProgress returns the current progress (0-100) +func (bt *BaseTypedTask) GetProgress() float64 { + bt.mutex.RLock() + defer bt.mutex.RUnlock() + return bt.progress +} + +// SetProgress sets the current progress and calls the callback if set +func (bt *BaseTypedTask) SetProgress(progress float64) { + bt.mutex.Lock() + callback := bt.progressCallback + bt.progress = progress + bt.mutex.Unlock() + + if callback != nil { + callback(progress) + } +} + +// SetProgressCallback sets the progress callback function +func (bt *BaseTypedTask) SetProgressCallback(callback func(float64)) { + bt.mutex.Lock() + defer bt.mutex.Unlock() + bt.progressCallback = callback +} + +// SetLoggerConfig sets the logger configuration for this task +func (bt *BaseTypedTask) SetLoggerConfig(config types.TaskLoggerConfig) { + bt.mutex.Lock() + defer bt.mutex.Unlock() + bt.loggerConfig = config +} + +// convertToTasksLoggerConfig converts types.TaskLoggerConfig to tasks.TaskLoggerConfig +func convertToTasksLoggerConfig(config types.TaskLoggerConfig) tasks.TaskLoggerConfig { + return tasks.TaskLoggerConfig{ + BaseLogDir: config.BaseLogDir, + MaxTasks: config.MaxTasks, + MaxLogSizeMB: config.MaxLogSizeMB, + EnableConsole: config.EnableConsole, + } +} + +// InitializeTaskLogger initializes the task logger with task details (LoggerProvider interface) +func (bt *BaseTypedTask) InitializeTaskLogger(taskID string, workerID string, params types.TaskParams) error { + bt.mutex.Lock() + defer bt.mutex.Unlock() + + bt.taskID = taskID + + // Convert the logger config to the tasks package type + tasksLoggerConfig := convertToTasksLoggerConfig(bt.loggerConfig) + + logger, err := tasks.NewTaskLogger(taskID, bt.taskType, workerID, params, tasksLoggerConfig) + if err != nil { + return fmt.Errorf("failed to initialize task logger: %w", err) + } + + bt.logger = logger + if bt.logger != nil { + bt.logger.Info("BaseTypedTask initialized for task %s (type: %s)", taskID, bt.taskType) + } + + return nil +} + +// GetTaskLogger returns the task logger (LoggerProvider interface) +func (bt *BaseTypedTask) GetTaskLogger() types.TaskLogger { + bt.mutex.RLock() + defer bt.mutex.RUnlock() + return bt.logger +} + +// LogInfo logs an info message +func (bt *BaseTypedTask) LogInfo(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Info(message, args...) + } +} + +// LogWarning logs a warning message +func (bt *BaseTypedTask) LogWarning(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Warning(message, args...) + } +} + +// LogError logs an error message +func (bt *BaseTypedTask) LogError(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Error(message, args...) + } +} + +// LogDebug logs a debug message +func (bt *BaseTypedTask) LogDebug(message string, args ...interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.Debug(message, args...) + } +} + +// LogWithFields logs a message with structured fields +func (bt *BaseTypedTask) LogWithFields(level string, message string, fields map[string]interface{}) { + bt.mutex.RLock() + logger := bt.logger + bt.mutex.RUnlock() + + if logger != nil { + logger.LogWithFields(level, message, fields) + } +} + +// ValidateTyped provides basic validation for typed parameters +func (bt *BaseTypedTask) ValidateTyped(params *worker_pb.TaskParams) error { + if params == nil { + return errors.New("task parameters cannot be nil") + } + if params.VolumeId == 0 { + return errors.New("volume_id is required") + } + if params.Server == "" { + return errors.New("server is required") + } + return nil +} + +// EstimateTimeTyped provides a default time estimation +func (bt *BaseTypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration { + // Default estimation - concrete tasks should override this + return 5 * time.Minute +} + +// ExecuteTyped is a placeholder that concrete tasks must implement +func (bt *BaseTypedTask) ExecuteTyped(params *worker_pb.TaskParams) error { + panic("ExecuteTyped must be implemented by concrete task types") +} |
