aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_decode.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_decode.go')
-rw-r--r--weed/shell/command_ec_decode.go67
1 files changed, 49 insertions, 18 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index e4d597d84..59c5bf83a 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -4,8 +4,11 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
+ "time"
"google.golang.org/grpc"
@@ -36,26 +39,38 @@ func (c *commandEcDecode) Help() string {
}
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
-
- if err = commandEnv.confirmIsLocked(); err != nil {
- return
- }
-
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
collection := encodeCommand.String("collection", "", "the collection name")
+ forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
if err = encodeCommand.Parse(args); err != nil {
return nil
}
+ infoAboutSimulationMode(writer, *forceChanges, "-force")
+
+ if err = commandEnv.confirmIsLocked(args); err != nil {
+ return
+ }
vid := needle.VolumeId(*volumeId)
// collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv)
+ topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
+ if !*forceChanges {
+ var nodeCount int
+ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ nodeCount++
+ })
+ if nodeCount < erasure_coding.ParityShardsCount {
+ glog.V(0).Infof("skip erasure coding with %d nodes, less than recommended %d nodes", nodeCount, erasure_coding.ParityShardsCount)
+ return nil
+ }
+ }
+
// volumeId is provided
if vid != 0 {
return doEcDecode(commandEnv, topologyInfo, *collection, vid)
@@ -100,10 +115,10 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
return nil
}
-func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
+func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
- if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(vid),
})
@@ -132,11 +147,11 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, ta
return nil
}
-func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: uint32(vid),
Collection: collection,
@@ -148,7 +163,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
}
-func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
+func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
maxShardCount := 0
var exisitngEcIndexBits erasure_coding.ShardBits
@@ -174,7 +189,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur
continue
}
- err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
@@ -185,7 +200,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur
CopyEcxFile: false,
CopyEcjFile: true,
CopyVifFile: true,
- SourceDataNode: loc,
+ SourceDataNode: string(loc),
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
@@ -208,10 +223,26 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur
}
-func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
+func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
+ var resp *master_pb.LookupVolumeResponse
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
+ return err
+ })
+ if err != nil {
+ return nil, err
+ }
+ return resp.VolumeIdLocations, nil
+}
+
+func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
+
+ if delayBeforeCollecting > 0 {
+ time.Sleep(delayBeforeCollecting)
+ }
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
@@ -243,14 +274,14 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri
return
}
-func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
+func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
- nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
+ nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
- nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
+ nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)
}
}
}