aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_encode.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_encode.go')
-rw-r--r--weed/shell/command_ec_encode.go66
1 files changed, 59 insertions, 7 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 817529478..e69d1bfc7 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -59,26 +59,47 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr
}
ctx := context.Background()
+ vid := needle.VolumeId(*volumeId)
+ // volumeId is provided
+ if vid != 0 {
+ return doEcEncode(ctx, commandEnv, *collection, vid)
+ }
+
+ // apply to all volumes in the collection
+ volumeIds, err := collectVolumeByCollection(ctx, commandEnv, *collection)
+ if err != nil {
+ return err
+ }
+ for _, vid := range volumeIds {
+ if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) {
// find volume location
- locations := commandEnv.masterClient.GetLocations(uint32(*volumeId))
+ locations := commandEnv.masterClient.GetLocations(uint32(vid))
if len(locations) == 0 {
- return fmt.Errorf("volume %d not found", *volumeId)
+ return fmt.Errorf("volume %d not found", vid)
}
// generate ec shards
- err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), *collection, locations[0].Url)
+ err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
if err != nil {
- return fmt.Errorf("generate ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err)
+ return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
// balance the ec shards to current cluster
- err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), *collection, locations)
+ err = balanceEcShards(ctx, commandEnv, vid, collection, locations)
if err != nil {
- return fmt.Errorf("balance ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err)
+ return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
- return err
+ return nil
}
func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
@@ -325,3 +346,34 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN
return
}
+
+func collectVolumeByCollection(ctx context.Context, commandEnv *commandEnv, selectedCollection string) (vids []needle.VolumeId, err error) {
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ vidMap := make(map[uint32]bool)
+ for _, dc := range resp.TopologyInfo.DataCenterInfos {
+ for _, r := range dc.RackInfos {
+ for _, dn := range r.DataNodeInfos {
+ for _, v := range dn.VolumeInfos {
+ if v.Collection == selectedCollection {
+ vidMap[v.Id] = true
+ }
+ }
+ }
+ }
+ }
+
+ for vid, _ := range vidMap {
+ vids = append(vids, needle.VolumeId(vid))
+ }
+
+ return
+}