diff options
| author | HongyanShen <763987993@qq.com> | 2020-03-11 12:55:24 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-11 12:55:24 +0800 |
| commit | 03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch) | |
| tree | ed8833386a712c850dcef0815509774681a6ab56 /weed/shell/command_ec_common.go | |
| parent | 0fca1ae776783b37481549df40f477b7d9248d3c (diff) | |
| parent | 60f5f05c78a2918d5219c925cea5847759281a2c (diff) | |
| download | seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip | |
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 43 |
1 files changed, 19 insertions, 24 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index cfe14fed5..0db119d3c 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -15,26 +15,26 @@ import ( "google.golang.org/grpc" ) -func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { copiedShardIds := []uint32{uint32(shardId)} if applyBalancing { // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) if err != nil { return err } // ask source node to delete the shard, and maybe the ecx file - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) if err != nil { return err } @@ -50,14 +50,10 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist } -func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServer *EcNode, startFromShardId uint32, shardCount int, +func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, + targetServer *EcNode, shardIdsToCopy []uint32, volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { - var shardIdsToCopy []uint32 - for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { - shardIdsToCopy = append(shardIdsToCopy, shardId) - } fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -65,11 +61,13 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption if targetServer.info.Id != existingLocation { fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: shardIdsToCopy, CopyEcxFile: true, + CopyEcjFile: true, + CopyVifFile: true, SourceDataNode: existingLocation, }) if copyErr != nil { @@ -78,7 +76,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption } fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id) - _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: shardIdsToCopy, @@ -180,12 +178,12 @@ type EcRack struct { freeEcSlot int } -func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { @@ -213,13 +211,12 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCen return } -func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { +func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ + _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: toBeDeletedShardIds, @@ -229,13 +226,12 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt } -func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { +func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{ + _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), ShardIds: toBeUnmountedhardIds, }) @@ -243,13 +239,12 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, }) } -func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { +func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: toBeMountedhardIds, |
