aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-24 02:52:56 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-24 02:52:56 -0700
commit5435027ff0906097c312bb1dde126ad03a05fc18 (patch)
treeee1076db4523d7efc77fd65cd6eb6d379b0bc71c /weed/shell/command_volume_fix_replication.go
parent3be3c17f59bc409b15b0dcb687e14c4af10f94c4 (diff)
downloadseaweedfs-5435027ff0906097c312bb1dde126ad03a05fc18.tar.xz
seaweedfs-5435027ff0906097c312bb1dde126ad03a05fc18.zip
volume copy: stream out copying progress and avoid grpc request timeout
fix https://github.com/chrislusf/seaweedfs/issues/2386
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go16
1 files changed, 15 insertions, 1 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 5d7506c0b..c003eea91 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -255,13 +255,27 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
}
err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
})
if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ if resp.ProcessedBytes > 0 {
+ fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes)
+ }
+ }
+
return nil
})