diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/shell/command_ec_decode.go | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/shell/command_ec_decode.go')
| -rw-r--r-- | weed/shell/command_ec_decode.go | 67 |
1 files changed, 49 insertions, 18 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index e4d597d84..59c5bf83a 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -4,8 +4,11 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "io" + "time" "google.golang.org/grpc" @@ -36,26 +39,38 @@ func (c *commandEcDecode) Help() string { } func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - - if err = commandEnv.confirmIsLocked(); err != nil { - return - } - encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := encodeCommand.Int("volumeId", 0, "the volume id") collection := encodeCommand.String("collection", "", "the collection name") + forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes") if err = encodeCommand.Parse(args); err != nil { return nil } + infoAboutSimulationMode(writer, *forceChanges, "-force") + + if err = commandEnv.confirmIsLocked(args); err != nil { + return + } vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv) + topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { return err } + if !*forceChanges { + var nodeCount int + eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + nodeCount++ + }) + if nodeCount < erasure_coding.ParityShardsCount { + glog.V(0).Infof("skip erasure coding with %d nodes, less than recommended %d nodes", nodeCount, erasure_coding.ParityShardsCount) + return nil + } + } + // volumeId is provided if vid != 0 { return doEcDecode(commandEnv, topologyInfo, *collection, vid) @@ -100,10 +115,10 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec return nil } -func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { +func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) @@ -132,11 +147,11 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, ta return nil } -func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, @@ -148,7 +163,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c } -func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { +func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) { maxShardCount := 0 var exisitngEcIndexBits erasure_coding.ShardBits @@ -174,7 +189,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) @@ -185,7 +200,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur CopyEcxFile: false, CopyEcjFile: true, CopyVifFile: true, - SourceDataNode: loc, + SourceDataNode: string(loc), }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr) @@ -208,10 +223,26 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur } -func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { +func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) { + var resp *master_pb.LookupVolumeResponse + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) + return err + }) + if err != nil { + return nil, err + } + return resp.VolumeIdLocations, nil +} + +func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { + + if delayBeforeCollecting > 0 { + time.Sleep(delayBeforeCollecting) + } var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) @@ -243,14 +274,14 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri return } -func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits { +func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits { - nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits) + nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { - nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits) } } } |
