aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/erasure_coding/ec_task.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-01 11:18:32 -0700
committerGitHub <noreply@github.com>2025-08-01 11:18:32 -0700
commit0975968e71b05368d5f28f788cf863c2042c2696 (patch)
tree5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/erasure_coding/ec_task.go
parent1cba609bfa2306cc2885df212febd5ff954aa693 (diff)
downloadseaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz
seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip
admin: Refactor task destination planning (#7063)
* refactor planning into task detection * refactoring worker tasks * refactor * compiles, but only balance task is registered * compiles, but has nil exception * avoid nil logger * add back ec task * setting ec log directory * implement balance and vacuum tasks * EC tasks will no longer fail with "file not found" errors * Use ReceiveFile API to send locally generated shards * distributing shard files and ecx,ecj,vif files * generate .ecx files correctly * do not mount all possible EC shards (0-13) on every destination * use constants * delete all replicas * rename files * pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/erasure_coding/ec_task.go')
-rw-r--r--weed/worker/tasks/erasure_coding/ec_task.go660
1 files changed, 660 insertions, 0 deletions
diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go
new file mode 100644
index 000000000..a6a3f749f
--- /dev/null
+++ b/weed/worker/tasks/erasure_coding/ec_task.go
@@ -0,0 +1,660 @@
+package erasure_coding
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types/base"
+ "google.golang.org/grpc"
+)
+
+// ErasureCodingTask implements the Task interface
+type ErasureCodingTask struct {
+ *base.BaseTask
+ server string
+ volumeID uint32
+ collection string
+ workDir string
+ progress float64
+
+ // EC parameters
+ dataShards int32
+ parityShards int32
+ destinations []*worker_pb.ECDestination
+ shardAssignment map[string][]string // destination -> assigned shard types
+ replicas []string // volume replica servers for deletion
+}
+
+// NewErasureCodingTask creates a new unified EC task instance
+func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask {
+ return &ErasureCodingTask{
+ BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding),
+ server: server,
+ volumeID: volumeID,
+ collection: collection,
+ dataShards: erasure_coding.DataShardsCount, // Default values
+ parityShards: erasure_coding.ParityShardsCount, // Default values
+ }
+}
+
+// Execute implements the UnifiedTask interface
+func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
+ if params == nil {
+ return fmt.Errorf("task parameters are required")
+ }
+
+ ecParams := params.GetErasureCodingParams()
+ if ecParams == nil {
+ return fmt.Errorf("erasure coding parameters are required")
+ }
+
+ t.dataShards = ecParams.DataShards
+ t.parityShards = ecParams.ParityShards
+ t.workDir = ecParams.WorkingDir
+ t.destinations = ecParams.Destinations
+ t.replicas = params.Replicas // Get replicas from task parameters
+
+ t.GetLogger().WithFields(map[string]interface{}{
+ "volume_id": t.volumeID,
+ "server": t.server,
+ "collection": t.collection,
+ "data_shards": t.dataShards,
+ "parity_shards": t.parityShards,
+ "destinations": len(t.destinations),
+ }).Info("Starting erasure coding task")
+
+ // Use the working directory from task parameters, or fall back to a default
+ baseWorkDir := t.workDir
+ if baseWorkDir == "" {
+ baseWorkDir = "/tmp/seaweedfs_ec_work"
+ }
+
+ // Create unique working directory for this task
+ taskWorkDir := filepath.Join(baseWorkDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
+ if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
+ return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
+ }
+ glog.V(1).Infof("Created working directory: %s", taskWorkDir)
+
+ // Update the task's working directory to the specific instance directory
+ t.workDir = taskWorkDir
+ glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir)
+
+ // Ensure cleanup of working directory (but preserve logs)
+ defer func() {
+ // Clean up volume files and EC shards, but preserve the directory structure and any logs
+ patterns := []string{"*.dat", "*.idx", "*.ec*", "*.vif"}
+ for _, pattern := range patterns {
+ matches, err := filepath.Glob(filepath.Join(taskWorkDir, pattern))
+ if err != nil {
+ continue
+ }
+ for _, match := range matches {
+ if err := os.Remove(match); err != nil {
+ glog.V(2).Infof("Could not remove %s: %v", match, err)
+ }
+ }
+ }
+ glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir)
+ }()
+
+ // Step 1: Mark volume readonly
+ t.ReportProgress(10.0)
+ t.GetLogger().Info("Marking volume readonly")
+ if err := t.markVolumeReadonly(); err != nil {
+ return fmt.Errorf("failed to mark volume readonly: %v", err)
+ }
+
+ // Step 2: Copy volume files to worker
+ t.ReportProgress(25.0)
+ t.GetLogger().Info("Copying volume files to worker")
+ localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to copy volume files: %v", err)
+ }
+
+ // Step 3: Generate EC shards locally
+ t.ReportProgress(40.0)
+ t.GetLogger().Info("Generating EC shards locally")
+ shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir)
+ if err != nil {
+ return fmt.Errorf("failed to generate EC shards: %v", err)
+ }
+
+ // Step 4: Distribute shards to destinations
+ t.ReportProgress(60.0)
+ t.GetLogger().Info("Distributing EC shards to destinations")
+ if err := t.distributeEcShards(shardFiles); err != nil {
+ return fmt.Errorf("failed to distribute EC shards: %v", err)
+ }
+
+ // Step 5: Mount EC shards
+ t.ReportProgress(80.0)
+ t.GetLogger().Info("Mounting EC shards")
+ if err := t.mountEcShards(); err != nil {
+ return fmt.Errorf("failed to mount EC shards: %v", err)
+ }
+
+ // Step 6: Delete original volume
+ t.ReportProgress(90.0)
+ t.GetLogger().Info("Deleting original volume")
+ if err := t.deleteOriginalVolume(); err != nil {
+ return fmt.Errorf("failed to delete original volume: %v", err)
+ }
+
+ t.ReportProgress(100.0)
+ glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed",
+ t.volumeID, t.server, len(shardFiles))
+
+ return nil
+}
+
+// Validate implements the UnifiedTask interface
+func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
+ if params == nil {
+ return fmt.Errorf("task parameters are required")
+ }
+
+ ecParams := params.GetErasureCodingParams()
+ if ecParams == nil {
+ return fmt.Errorf("erasure coding parameters are required")
+ }
+
+ if params.VolumeId != t.volumeID {
+ return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
+ }
+
+ if params.Server != t.server {
+ return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server)
+ }
+
+ if ecParams.DataShards < 1 {
+ return fmt.Errorf("invalid data shards: %d (must be >= 1)", ecParams.DataShards)
+ }
+
+ if ecParams.ParityShards < 1 {
+ return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards)
+ }
+
+ if len(ecParams.Destinations) < int(ecParams.DataShards+ecParams.ParityShards) {
+ return fmt.Errorf("insufficient destinations: got %d, need %d", len(ecParams.Destinations), ecParams.DataShards+ecParams.ParityShards)
+ }
+
+ return nil
+}
+
+// EstimateTime implements the UnifiedTask interface
+func (t *ErasureCodingTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
+ // Basic estimate based on simulated steps
+ return 20 * time.Second // Sum of all step durations
+}
+
+// GetProgress returns current progress
+func (t *ErasureCodingTask) GetProgress() float64 {
+ return t.progress
+}
+
+// Helper methods for actual EC operations
+
+// markVolumeReadonly marks the volume as readonly on the source server
+func (t *ErasureCodingTask) markVolumeReadonly() error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: t.volumeID,
+ })
+ return err
+ })
+}
+
+// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
+func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
+ localFiles := make(map[string]string)
+
+ // Copy .dat file
+ datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
+ if err := t.copyFileFromSource(".dat", datFile); err != nil {
+ return nil, fmt.Errorf("failed to copy .dat file: %v", err)
+ }
+ localFiles["dat"] = datFile
+
+ // Copy .idx file
+ idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
+ if err := t.copyFileFromSource(".idx", idxFile); err != nil {
+ return nil, fmt.Errorf("failed to copy .idx file: %v", err)
+ }
+ localFiles["idx"] = idxFile
+
+ return localFiles, nil
+}
+
+// copyFileFromSource copies a file from source server to local path using gRPC streaming
+func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ Ext: ext,
+ StopOffset: uint64(math.MaxInt64),
+ })
+ if err != nil {
+ return fmt.Errorf("failed to initiate file copy: %v", err)
+ }
+
+ // Create local file
+ localFile, err := os.Create(localPath)
+ if err != nil {
+ return fmt.Errorf("failed to create local file %s: %v", localPath, err)
+ }
+ defer localFile.Close()
+
+ // Stream data and write to local file
+ totalBytes := int64(0)
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("failed to receive file data: %v", err)
+ }
+
+ if len(resp.FileContent) > 0 {
+ written, writeErr := localFile.Write(resp.FileContent)
+ if writeErr != nil {
+ return fmt.Errorf("failed to write to local file: %v", writeErr)
+ }
+ totalBytes += int64(written)
+ }
+ }
+
+ glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath)
+ return nil
+ })
+}
+
+// generateEcShardsLocally generates EC shards from local volume files
+func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
+ datFile := localFiles["dat"]
+ idxFile := localFiles["idx"]
+
+ if datFile == "" || idxFile == "" {
+ return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
+ }
+
+ // Get base name without extension for EC operations
+ baseName := strings.TrimSuffix(datFile, ".dat")
+ shardFiles := make(map[string]string)
+
+ glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
+
+ // Generate EC shard files (.ec00 ~ .ec13)
+ if err := erasure_coding.WriteEcFiles(baseName); err != nil {
+ return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
+ }
+
+ // Generate .ecx file from .idx (use baseName, not full idx path)
+ if err := erasure_coding.WriteSortedFileFromIdx(baseName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
+ }
+
+ // Collect generated shard file paths
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
+ if _, err := os.Stat(shardFile); err == nil {
+ shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
+ }
+ }
+
+ // Add metadata files
+ ecxFile := baseName + ".ecx"
+ if _, err := os.Stat(ecxFile); err == nil {
+ shardFiles["ecx"] = ecxFile
+ }
+
+ // Generate .vif file (volume info)
+ vifFile := baseName + ".vif"
+ volumeInfo := &volume_server_pb.VolumeInfo{
+ Version: uint32(needle.GetCurrentVersion()),
+ }
+ if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
+ glog.Warningf("Failed to create .vif file: %v", err)
+ } else {
+ shardFiles["vif"] = vifFile
+ }
+
+ glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
+ return shardFiles, nil
+}
+
+// distributeEcShards distributes locally generated EC shards to destination servers
+func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error {
+ if len(t.destinations) == 0 {
+ return fmt.Errorf("no destinations specified for EC shard distribution")
+ }
+
+ if len(shardFiles) == 0 {
+ return fmt.Errorf("no shard files available for distribution")
+ }
+
+ // Create shard assignment: assign specific shards to specific destinations
+ shardAssignment := t.createShardAssignment(shardFiles)
+ if len(shardAssignment) == 0 {
+ return fmt.Errorf("failed to create shard assignment")
+ }
+
+ // Store assignment for use during mounting
+ t.shardAssignment = shardAssignment
+
+ // Send assigned shards to each destination
+ for destNode, assignedShards := range shardAssignment {
+ t.GetLogger().WithFields(map[string]interface{}{
+ "destination": destNode,
+ "assigned_shards": len(assignedShards),
+ "shard_ids": assignedShards,
+ }).Info("Distributing assigned EC shards to destination")
+
+ // Send only the assigned shards to this destination
+ for _, shardType := range assignedShards {
+ filePath, exists := shardFiles[shardType]
+ if !exists {
+ return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
+ }
+
+ if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil {
+ return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err)
+ }
+ }
+ }
+
+ glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
+ return nil
+}
+
+// createShardAssignment assigns specific EC shards to specific destination servers
+// Each destination gets a subset of shards based on availability and placement rules
+func (t *ErasureCodingTask) createShardAssignment(shardFiles map[string]string) map[string][]string {
+ assignment := make(map[string][]string)
+
+ // Collect all available EC shards (ec00-ec13)
+ var availableShards []string
+ for shardType := range shardFiles {
+ if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
+ availableShards = append(availableShards, shardType)
+ }
+ }
+
+ // Sort shards for consistent assignment
+ sort.Strings(availableShards)
+
+ if len(availableShards) == 0 {
+ glog.Warningf("No EC shards found for assignment")
+ return assignment
+ }
+
+ // Calculate shards per destination
+ numDestinations := len(t.destinations)
+ if numDestinations == 0 {
+ return assignment
+ }
+
+ // Strategy: Distribute shards as evenly as possible across destinations
+ // With 14 shards and N destinations, some destinations get ⌈14/NāŒ‰ shards, others get ⌊14/NāŒ‹
+ shardsPerDest := len(availableShards) / numDestinations
+ extraShards := len(availableShards) % numDestinations
+
+ shardIndex := 0
+ for i, dest := range t.destinations {
+ var destShards []string
+
+ // Assign base number of shards
+ shardsToAssign := shardsPerDest
+
+ // Assign one extra shard to first 'extraShards' destinations
+ if i < extraShards {
+ shardsToAssign++
+ }
+
+ // Assign the shards
+ for j := 0; j < shardsToAssign && shardIndex < len(availableShards); j++ {
+ destShards = append(destShards, availableShards[shardIndex])
+ shardIndex++
+ }
+
+ assignment[dest.Node] = destShards
+
+ glog.V(2).Infof("Assigned shards %v to destination %s", destShards, dest.Node)
+ }
+
+ // Assign metadata files (.ecx, .vif) to each destination that has shards
+ // Note: .ecj files are created during mount, not during initial generation
+ for destNode, destShards := range assignment {
+ if len(destShards) > 0 {
+ // Add .ecx file if available
+ if _, hasEcx := shardFiles["ecx"]; hasEcx {
+ assignment[destNode] = append(assignment[destNode], "ecx")
+ }
+
+ // Add .vif file if available
+ if _, hasVif := shardFiles["vif"]; hasVif {
+ assignment[destNode] = append(assignment[destNode], "vif")
+ }
+
+ glog.V(2).Infof("Assigned metadata files (.ecx, .vif) to destination %s", destNode)
+ }
+ }
+
+ return assignment
+}
+
+// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API
+func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error {
+ return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ // Open the local shard file
+ file, err := os.Open(filePath)
+ if err != nil {
+ return fmt.Errorf("failed to open shard file %s: %v", filePath, err)
+ }
+ defer file.Close()
+
+ // Get file size
+ fileInfo, err := file.Stat()
+ if err != nil {
+ return fmt.Errorf("failed to get file info for %s: %v", filePath, err)
+ }
+
+ // Determine file extension and shard ID
+ var ext string
+ var shardId uint32
+ if shardType == "ecx" {
+ ext = ".ecx"
+ shardId = 0 // ecx file doesn't have a specific shard ID
+ } else if shardType == "vif" {
+ ext = ".vif"
+ shardId = 0 // vif file doesn't have a specific shard ID
+ } else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
+ // EC shard file like "ec00", "ec01", etc.
+ ext = "." + shardType
+ fmt.Sscanf(shardType[2:], "%d", &shardId)
+ } else {
+ return fmt.Errorf("unknown shard type: %s", shardType)
+ }
+
+ // Create streaming client
+ stream, err := client.ReceiveFile(context.Background())
+ if err != nil {
+ return fmt.Errorf("failed to create receive stream: %v", err)
+ }
+
+ // Send file info first
+ err = stream.Send(&volume_server_pb.ReceiveFileRequest{
+ Data: &volume_server_pb.ReceiveFileRequest_Info{
+ Info: &volume_server_pb.ReceiveFileInfo{
+ VolumeId: t.volumeID,
+ Ext: ext,
+ Collection: t.collection,
+ IsEcVolume: true,
+ ShardId: shardId,
+ FileSize: uint64(fileInfo.Size()),
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to send file info: %v", err)
+ }
+
+ // Send file content in chunks
+ buffer := make([]byte, 64*1024) // 64KB chunks
+ for {
+ n, readErr := file.Read(buffer)
+ if n > 0 {
+ err = stream.Send(&volume_server_pb.ReceiveFileRequest{
+ Data: &volume_server_pb.ReceiveFileRequest_FileContent{
+ FileContent: buffer[:n],
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to send file content: %v", err)
+ }
+ }
+ if readErr == io.EOF {
+ break
+ }
+ if readErr != nil {
+ return fmt.Errorf("failed to read file: %v", readErr)
+ }
+ }
+
+ // Close stream and get response
+ resp, err := stream.CloseAndRecv()
+ if err != nil {
+ return fmt.Errorf("failed to close stream: %v", err)
+ }
+
+ if resp.Error != "" {
+ return fmt.Errorf("server error: %s", resp.Error)
+ }
+
+ glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer)
+ return nil
+ })
+}
+
+// mountEcShards mounts EC shards on destination servers
+func (t *ErasureCodingTask) mountEcShards() error {
+ if t.shardAssignment == nil {
+ return fmt.Errorf("shard assignment not available for mounting")
+ }
+
+ // Mount only assigned shards on each destination
+ for destNode, assignedShards := range t.shardAssignment {
+ // Convert shard names to shard IDs for mounting
+ var shardIds []uint32
+ for _, shardType := range assignedShards {
+ // Skip metadata files (.ecx, .vif) - only mount EC shards
+ if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
+ // Parse shard ID from "ec00", "ec01", etc.
+ var shardId uint32
+ if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil {
+ shardIds = append(shardIds, shardId)
+ }
+ }
+ }
+
+ if len(shardIds) == 0 {
+ glog.V(1).Infof("No EC shards to mount on %s (only metadata files)", destNode)
+ continue
+ }
+
+ glog.V(1).Infof("Mounting shards %v on %s", shardIds, destNode)
+
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: t.volumeID,
+ Collection: t.collection,
+ ShardIds: shardIds,
+ })
+ return mountErr
+ })
+
+ if err != nil {
+ glog.Warningf("Failed to mount shards %v on %s: %v", shardIds, destNode, err)
+ } else {
+ glog.V(1).Infof("Successfully mounted EC shards %v on %s", shardIds, destNode)
+ }
+ }
+
+ return nil
+}
+
+// deleteOriginalVolume deletes the original volume and all its replicas from all servers
+func (t *ErasureCodingTask) deleteOriginalVolume() error {
+ // Get replicas from task parameters (set during detection)
+ replicas := t.getReplicas()
+
+ if len(replicas) == 0 {
+ glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID)
+ replicas = []string{t.server}
+ }
+
+ glog.V(1).Infof("Deleting volume %d from %d replica servers: %v", t.volumeID, len(replicas), replicas)
+
+ // Delete volume from all replica locations
+ var deleteErrors []string
+ successCount := 0
+
+ for _, replicaServer := range replicas {
+ err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(),
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
+ VolumeId: t.volumeID,
+ OnlyEmpty: false, // Force delete since we've created EC shards
+ })
+ return err
+ })
+
+ if err != nil {
+ deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err))
+ glog.Warningf("Failed to delete volume %d from replica server %s: %v", t.volumeID, replicaServer, err)
+ } else {
+ successCount++
+ glog.V(1).Infof("Successfully deleted volume %d from replica server %s", t.volumeID, replicaServer)
+ }
+ }
+
+ // Report results
+ if len(deleteErrors) > 0 {
+ glog.Warningf("Some volume deletions failed (%d/%d successful): %v", successCount, len(replicas), deleteErrors)
+ // Don't return error - EC task should still be considered successful if shards are mounted
+ } else {
+ glog.V(1).Infof("Successfully deleted volume %d from all %d replica servers", t.volumeID, len(replicas))
+ }
+
+ return nil
+}
+
+// getReplicas extracts replica servers from task parameters
+func (t *ErasureCodingTask) getReplicas() []string {
+ // Access replicas from the parameters passed during Execute
+ // We'll need to store these during Execute - let me add a field to the task
+ return t.replicas
+}