diff options
Diffstat (limited to 'weed/shell/command_ec_decode.go')
| -rw-r--r-- | weed/shell/command_ec_decode.go | 43 |
1 files changed, 21 insertions, 22 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 8a705a5ae..b69e403cb 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -43,25 +43,24 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, err := collectTopologyInfo(ctx, commandEnv) + topologyInfo, err := collectTopologyInfo(commandEnv) if err != nil { return err } // volumeId is provided if vid != 0 { - return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid) + return doEcDecode(commandEnv, topologyInfo, *collection, vid) } // apply to all volumes in the collection volumeIds := collectEcShardIds(topologyInfo, *collection) fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { return err } } @@ -69,26 +68,26 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { +func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { // find volume location nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid) fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) // collect ec shards to the server with most space - targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid) + targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid) if err != nil { return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) } // generate a normal volume - err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) + err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) if err != nil { return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } // delete the previous ec shards - err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) + err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } @@ -96,11 +95,11 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb return nil } -func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { +func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) return mountErr @@ -111,7 +110,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO // unmount ec shards for location, ecIndexBits := range nodeToEcIndexBits { fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) + err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) if err != nil { return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) } @@ -119,7 +118,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO // delete ec shards for location, ecIndexBits := range nodeToEcIndexBits { fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) + err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) if err != nil { return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) } @@ -128,12 +127,12 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO return nil } -func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, }) @@ -144,7 +143,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v } -func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { +func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { maxShardCount := 0 var exisitngEcIndexBits erasure_coding.ShardBits @@ -170,11 +169,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(vid), Collection: collection, ShardIds: needToCopyEcIndexBits.ToUint32Slice(), @@ -204,11 +203,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB } -func collectTopologyInfo(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { +func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { |
