aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-07-14 12:17:30 -0700
committerchrislu <chris.lu@gmail.com>2025-07-14 12:17:33 -0700
commit44dfa793d52e99c6e92efb9e898513fde8a0154f (patch)
tree1bf579acfb205fd142d7edbf082790d385c49806
parent606d516e34f2fc7691775bdaff8bbc5c0b4e1b6c (diff)
downloadseaweedfs-44dfa793d52e99c6e92efb9e898513fde8a0154f.tar.xz
seaweedfs-44dfa793d52e99c6e92efb9e898513fde8a0154f.zip
Collecting volume locations for volumes before EC encoding
fix https://github.com/seaweedfs/seaweedfs/issues/6963
-rw-r--r--weed/shell/command_ec_encode.go37
1 files changed, 26 insertions, 11 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index ebdd95b71..499196e8a 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -4,10 +4,11 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
"io"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
@@ -115,6 +116,14 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
balanceCollections = []string{*collection}
}
+ // Collect volume locations BEFORE EC encoding starts to avoid race condition
+ // where the master metadata is updated after EC encoding but before deletion
+ fmt.Printf("Collecting volume locations for %d volumes before EC encoding...\n", len(volumeIds))
+ volumeLocationsMap, err := volumeLocations(commandEnv, volumeIds)
+ if err != nil {
+ return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err)
+ }
+
// encode all requested volumes...
if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
@@ -123,10 +132,12 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
}
- // ...then delete original volumes.
- if err := doDeleteVolumes(commandEnv, volumeIds, *maxParallelization); err != nil {
- return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
+ // ...then delete original volumes using pre-collected locations.
+ fmt.Printf("Deleting original volumes after EC encoding...\n")
+ if err := doDeleteVolumesWithLocations(commandEnv, volumeIds, volumeLocationsMap, *maxParallelization); err != nil {
+ return fmt.Errorf("delete original volumes after EC encoding: %v", err)
}
+ fmt.Printf("Successfully completed EC encoding for %d volumes\n", len(volumeIds))
return nil
}
@@ -150,7 +161,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
}
locations, err := volumeLocations(commandEnv, volumeIds)
if err != nil {
- return nil
+ return fmt.Errorf("failed to get volume locations for EC encoding: %v", err)
}
// mark volumes as readonly
@@ -207,18 +218,22 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
return nil
}
-func doDeleteVolumes(commandEnv *CommandEnv, volumeIds []needle.VolumeId, maxParallelization int) error {
+// doDeleteVolumesWithLocations deletes volumes using pre-collected location information
+// This avoids race conditions where master metadata is updated after EC encoding
+func doDeleteVolumesWithLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId, volumeLocationsMap map[needle.VolumeId][]wdclient.Location, maxParallelization int) error {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
- locations, err := volumeLocations(commandEnv, volumeIds)
- if err != nil {
- return nil
- }
ewg := NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
- for _, l := range locations[vid] {
+ locations, found := volumeLocationsMap[vid]
+ if !found {
+ fmt.Printf("warning: no locations found for volume %d, skipping deletion\n", vid)
+ continue
+ }
+
+ for _, l := range locations {
ewg.Add(func() error {
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)