diff options
Diffstat (limited to 'weed/worker/tasks/erasure_coding/ec.go')
| -rw-r--r-- | weed/worker/tasks/erasure_coding/ec.go | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go new file mode 100644 index 000000000..641dfc6b5 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -0,0 +1,79 @@ +package erasure_coding + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Task implements erasure coding operation to convert volumes to EC format +type Task struct { + *tasks.BaseTask + server string + volumeID uint32 +} + +// NewTask creates a new erasure coding task instance +func NewTask(server string, volumeID uint32) *Task { + task := &Task{ + BaseTask: tasks.NewBaseTask(types.TaskTypeErasureCoding), + server: server, + volumeID: volumeID, + } + return task +} + +// Execute executes the erasure coding task +func (t *Task) Execute(params types.TaskParams) error { + glog.Infof("Starting erasure coding task for volume %d on server %s", t.volumeID, t.server) + + // Simulate erasure coding operation with progress updates + steps := []struct { + name string + duration time.Duration + progress float64 + }{ + {"Analyzing volume", 2 * time.Second, 15}, + {"Creating EC shards", 5 * time.Second, 50}, + {"Verifying shards", 2 * time.Second, 75}, + {"Finalizing EC volume", 1 * time.Second, 100}, + } + + for _, step := range steps { + if t.IsCancelled() { + return fmt.Errorf("erasure coding task cancelled") + } + + glog.V(1).Infof("Erasure coding task step: %s", step.name) + t.SetProgress(step.progress) + + // Simulate work + time.Sleep(step.duration) + } + + glog.Infof("Erasure coding task completed for volume %d on server %s", t.volumeID, t.server) + return nil +} + +// Validate validates the task parameters +func (t *Task) Validate(params types.TaskParams) error { + if params.VolumeID == 0 { + return fmt.Errorf("volume_id is required") + } + if params.Server == "" { + return fmt.Errorf("server is required") + } + return nil +} + +// EstimateTime estimates the time needed for the task +func (t *Task) EstimateTime(params types.TaskParams) time.Duration { + // Base time for erasure coding operation + baseTime := 30 * time.Second + + // Could adjust based on volume size or other factors + return baseTime +} |
