aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_encode.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_encode.go')
-rw-r--r--weed/shell/command_ec_encode.go90
1 files changed, 55 insertions, 35 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index afbf90b44..d9e3a88c6 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -118,33 +118,42 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
}
- // ...then re-balance ec shards.
+ // ...re-balance ec shards...
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
}
+ // ...then delete original volumes.
+ if err := doDeleteVolumes(commandEnv, volumeIds, *maxParallelization); err != nil {
+ return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
+ }
return nil
}
-func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
- var ewg *ErrorWaitGroup
-
- if !commandEnv.isLocked() {
- return fmt.Errorf("lock is lost")
- }
-
- // resolve volume locations
- locations := map[needle.VolumeId][]wdclient.Location{}
+func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[needle.VolumeId][]wdclient.Location, error) {
+ res := map[needle.VolumeId][]wdclient.Location{}
for _, vid := range volumeIds {
ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !ok {
- return fmt.Errorf("volume %d not found", vid)
+ return nil, fmt.Errorf("volume %d not found", vid)
}
- locations[vid] = ls
+ res[vid] = ls
+ }
+
+ return res, nil
+}
+
+func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
+ if !commandEnv.isLocked() {
+ return fmt.Errorf("lock is lost")
+ }
+ locations, err := volumeLocations(commandEnv, volumeIds)
+ if err != nil {
+ return nil
}
// mark volumes as readonly
- ewg = NewErrorWaitGroup(maxParallelization)
+ ewg := NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
for _, l := range locations[vid] {
ewg.Add(func() error {
@@ -160,9 +169,9 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
}
// generate ec shards
- ewg = NewErrorWaitGroup(maxParallelization)
- for _, vid := range volumeIds {
- target := locations[vid][0]
+ ewg.Reset()
+ for i, vid := range volumeIds {
+ target := locations[vid][i%len(locations[vid])]
ewg.Add(func() error {
if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
@@ -174,30 +183,13 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
return err
}
- // ask the source volume server to delete the original volume
- ewg = NewErrorWaitGroup(maxParallelization)
- for _, vid := range volumeIds {
- for _, l := range locations[vid] {
- ewg.Add(func() error {
- if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
- return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
- }
- fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
- return nil
- })
- }
- }
- if err := ewg.Wait(); err != nil {
- return err
- }
-
// mount all ec shards for the converted volume
shardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := range shardIds {
shardIds[i] = uint32(i)
}
- ewg = NewErrorWaitGroup(maxParallelization)
+ ewg.Reset()
for _, vid := range volumeIds {
target := locations[vid][0]
ewg.Add(func() error {
@@ -214,9 +206,37 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
return nil
}
+func doDeleteVolumes(commandEnv *CommandEnv, volumeIds []needle.VolumeId, maxParallelization int) error {
+ if !commandEnv.isLocked() {
+ return fmt.Errorf("lock is lost")
+ }
+ locations, err := volumeLocations(commandEnv, volumeIds)
+ if err != nil {
+ return nil
+ }
+
+ ewg := NewErrorWaitGroup(maxParallelization)
+ for _, vid := range volumeIds {
+ for _, l := range locations[vid] {
+ ewg.Add(func() error {
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
+ return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
+ }
+ fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
+ return nil
+ })
+ }
+ }
+ if err := ewg.Wait(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
- fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
+ fmt.Printf("generateEcShards %d (collection %q) on %s ...\n", volumeId, collection, sourceVolumeServer)
err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{