diff options
Diffstat (limited to 'weed/shell/command_volume_move.go')
| -rw-r--r-- | weed/shell/command_volume_move.go | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 3dcf41354..c404d8e30 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "log" @@ -62,7 +63,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr + sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr) volumeId := needle.VolumeId(*volumeIdInt) @@ -74,7 +75,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. } // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests. -func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) { +func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) { log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType) @@ -100,7 +101,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n return nil } -func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, diskType string) (lastAppendAtNs uint64, err error) { +func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) { // check to see if the volume is already read-only and if its not then we need // to mark it as read-only and then before we return we need to undo what we @@ -142,7 +143,7 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), - SourceDataNode: sourceVolumeServer, + SourceDataNode: string(sourceVolumeServer), DiskType: diskType, }) if replicateErr == nil { @@ -154,21 +155,21 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source return } -func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { +func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, IdleTimeoutSeconds: uint32(idleTimeout.Seconds()), - SourceVolumeServer: sourceVolumeServer, + SourceVolumeServer: string(sourceVolumeServer), }) return replicateErr }) } -func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { +func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), @@ -177,7 +178,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour }) } -func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) { +func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) { return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if writable { _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ @@ -195,7 +196,7 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error { for _, location := range locations { fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) - if err := markVolumeWritable(grpcDialOption, volumeId, location.Url, writable); err != nil { + if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil { return err } } |
