diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-12 22:47:52 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-12 22:47:52 -0700 |
| commit | e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6 (patch) | |
| tree | 3ad0436940263a24ac46d38a60dd1e35b2c1cdfe /weed/shell/command_ec_common.go | |
| parent | 2c9d4c8f43c1e95c75fc332ca83d19e33e5da3ac (diff) | |
| download | seaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.tar.xz seaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.zip | |
change server address from string to a type
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index fd35bb14b..51c8c32cd 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,6 +3,7 @@ package shell import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "math" "sort" @@ -22,20 +23,22 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, if applyBalancing { + existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info) + // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds) if err != nil { return err } // ask source node to delete the shard, and maybe the ecx file - err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds) if err != nil { return err } @@ -53,13 +56,14 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer *EcNode, shardIdsToCopy []uint32, - volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) { fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + targetAddress := pb.NewServerAddressFromDataNode(targetServer.info) + err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - if targetServer.info.Id != existingLocation { + if targetAddress != existingLocation { fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ @@ -69,7 +73,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, CopyEcxFile: true, CopyEcjFile: true, CopyVifFile: true, - SourceDataNode: existingLocation, + SourceDataNode: string(existingLocation), }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr) @@ -86,7 +90,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr) } - if targetServer.info.Id != existingLocation { + if targetAddress != existingLocation { copiedShardIds = shardIdsToCopy glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) } @@ -233,7 +237,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter return } -func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { +func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error { fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) @@ -248,7 +252,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin } -func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { +func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error { fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) @@ -261,7 +265,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s }) } -func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { +func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error { fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) |
