aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_copy.go
diff options
context:
space:
mode:
authorqzh <951012707@qq.com>2022-08-22 14:08:31 +0800
committerGitHub <noreply@github.com>2022-08-21 23:08:31 -0700
commit74b53729e1db38dddbe35258552162dc95fc3c55 (patch)
tree847da5ce7006237dfbd051f676b5e89ac76244c4 /weed/server/volume_grpc_copy.go
parentc4e862e90852de651aa0c6e7cbe16789be9d5d28 (diff)
downloadseaweedfs-74b53729e1db38dddbe35258552162dc95fc3c55.tar.xz
seaweedfs-74b53729e1db38dddbe35258552162dc95fc3c55.zip
feat(weed.move): add a speed limit parameter of moving files (#3478)
* feat(weed.move): add a speed limit parameter of moving files * fix(weed.move): set the default value of ioBytePerSecond to vs.compactionBytePerSecond Co-authored-by: zhihao.qu <zhihao.qu@ly.com>
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
-rw-r--r--weed/server/volume_grpc_copy.go24
1 files changed, 18 insertions, 6 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 630b1610b..d9928ed18 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -108,7 +108,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget := reportInterval
var modifiedTsNs int64
var sendErr error
- if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
+ var ioBytePerSecond int64
+ if req.IoBytePerSecond <= 0 {
+ ioBytePerSecond = vs.compactionBytePerSecond
+ } else {
+ ioBytePerSecond = req.IoBytePerSecond
+ }
+ throttler := util.NewWriteThrottler(ioBytePerSecond)
+ if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
if processed > nextReportTarget {
copyResponse.ProcessedBytes = processed
if sendErr = stream.Send(copyResponse); sendErr != nil {
@@ -117,7 +124,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget = processed + reportInterval
}
return true
- }); err != nil {
+ }, throttler); err != nil {
return err
}
if sendErr != nil {
@@ -127,14 +134,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
- if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
+ if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil {
return err
}
if modifiedTsNs > 0 {
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
- if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
+ if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil {
return err
}
if modifiedTsNs > 0 {
@@ -184,6 +191,10 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
+ return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond))
+}
+
+func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -198,7 +209,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
- modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
+ modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, progressFn)
if err != nil {
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
@@ -207,7 +218,8 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
}
-/**
+/*
+*
only check the the differ of the file size
todo: maybe should check the received count and deleted count of the volume
*/