aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
-rw-r--r--weed/server/volume_grpc_copy.go55
1 files changed, 41 insertions, 14 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 10a4ec473..28018f344 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -22,7 +22,7 @@ import (
const BufferSizeLimit = 1024 * 1024 * 2
// VolumeCopy copy the .idx .dat .vif files, and mount the volume
-func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
+func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v != nil {
@@ -31,7 +31,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
+ return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
}
glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
@@ -79,22 +79,38 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}()
// println("source:", volFileInfoResp.String())
+ copyResponse := &volume_server_pb.VolumeCopyResponse{}
+ reportInterval := int64(1024*1024*128)
+ nextReportTarget := reportInterval
var modifiedTsNs int64
- if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
+ var sendErr error
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
+ if processed > nextReportTarget {
+ copyResponse.ProcessedBytes = processed
+ if sendErr = stream.Send(copyResponse); sendErr != nil {
+ return false
+ }
+ nextReportTarget = processed + reportInterval
+ }
+ return true
+ }); err != nil {
return err
}
+ if sendErr != nil {
+ return sendErr
+ }
if modifiedTsNs > 0 {
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
- if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
return err
}
if modifiedTsNs > 0 {
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
- if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
return err
}
if modifiedTsNs > 0 {
@@ -107,10 +123,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
})
if err != nil {
- return nil, err
+ return err
}
if dataBaseFileName == "" {
- return nil, fmt.Errorf("not found volume %d file", req.VolumeId)
+ return fmt.Errorf("not found volume %d file", req.VolumeId)
}
idxFileName = indexBaseFileName + ".idx"
@@ -125,21 +141,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}()
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
- return nil, err
+ return err
}
// mount the volume
err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
+ return fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
}
- return &volume_server_pb.VolumeCopyResponse{
+ if err = stream.Send(&volume_server_pb.VolumeCopyResponse{
LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
- }, err
+ }); err != nil {
+ glog.Errorf("send response: %v", err)
+ }
+
+ return err
}
-func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -154,7 +174,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
- modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
+ modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
if err != nil {
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
@@ -188,7 +208,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
return nil
}
-func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) {
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
glog.V(4).Infof("writing to %s", fileName)
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if isAppend {
@@ -200,6 +220,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
defer dst.Close()
+ var progressedBytes int64
for {
resp, receiveErr := client.Recv()
if receiveErr == io.EOF {
@@ -212,6 +233,12 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
+ progressedBytes += int64(len(resp.FileContent))
+ if progressFn != nil {
+ if !progressFn(progressedBytes) {
+ return modifiedTsNs, fmt.Errorf("interrupted copy operation")
+ }
+ }
wt.MaybeSlowdown(int64(len(resp.FileContent)))
}
return modifiedTsNs, nil