diff options
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 55 |
1 files changed, 41 insertions, 14 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 10a4ec473..28018f344 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -22,7 +22,7 @@ import ( const BufferSizeLimit = 1024 * 1024 * 2 // VolumeCopy copy the .idx .dat .vif files, and mount the volume -func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) { +func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v != nil { @@ -31,7 +31,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) if err != nil { - return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) + return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) } glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId) @@ -79,22 +79,38 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }() // println("source:", volFileInfoResp.String()) + copyResponse := &volume_server_pb.VolumeCopyResponse{} + reportInterval := int64(1024*1024*128) + nextReportTarget := reportInterval var modifiedTsNs int64 - if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { + var sendErr error + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool { + if processed > nextReportTarget { + copyResponse.ProcessedBytes = processed + if sendErr = stream.Send(copyResponse); sendErr != nil { + return false + } + nextReportTarget = processed + reportInterval + } + return true + }); err != nil { return err } + if sendErr != nil { + return sendErr + } if modifiedTsNs > 0 { os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) } - if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil { return err } if modifiedTsNs > 0 { os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) } - if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil { return err } if modifiedTsNs > 0 { @@ -107,10 +123,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }) if err != nil { - return nil, err + return err } if dataBaseFileName == "" { - return nil, fmt.Errorf("not found volume %d file", req.VolumeId) + return fmt.Errorf("not found volume %d file", req.VolumeId) } idxFileName = indexBaseFileName + ".idx" @@ -125,21 +141,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }() if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 - return nil, err + return err } // mount the volume err = vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { - return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) + return fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) } - return &volume_server_pb.VolumeCopyResponse{ + if err = stream.Send(&volume_server_pb.VolumeCopyResponse{ LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second), - }, err + }); err != nil { + glog.Errorf("send response: %v", err) + } + + return err } -func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) { +func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -154,7 +174,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) + modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn) if err != nil { return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } @@ -188,7 +208,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse return nil } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) { +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { glog.V(4).Infof("writing to %s", fileName) flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC if isAppend { @@ -200,6 +220,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s } defer dst.Close() + var progressedBytes int64 for { resp, receiveErr := client.Recv() if receiveErr == io.EOF { @@ -212,6 +233,12 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr) } dst.Write(resp.FileContent) + progressedBytes += int64(len(resp.FileContent)) + if progressFn != nil { + if !progressFn(progressedBytes) { + return modifiedTsNs, fmt.Errorf("interrupted copy operation") + } + } wt.MaybeSlowdown(int64(len(resp.FileContent))) } return modifiedTsNs, nil |
