diff options
| author | qzh <951012707@qq.com> | 2022-08-22 14:08:31 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-08-21 23:08:31 -0700 |
| commit | 74b53729e1db38dddbe35258552162dc95fc3c55 (patch) | |
| tree | 847da5ce7006237dfbd051f676b5e89ac76244c4 /weed/server/volume_grpc_copy.go | |
| parent | c4e862e90852de651aa0c6e7cbe16789be9d5d28 (diff) | |
| download | seaweedfs-74b53729e1db38dddbe35258552162dc95fc3c55.tar.xz seaweedfs-74b53729e1db38dddbe35258552162dc95fc3c55.zip | |
feat(weed.move): add a speed limit parameter of moving files (#3478)
* feat(weed.move): add a speed limit parameter of moving files
* fix(weed.move): set the default value of ioBytePerSecond to vs.compactionBytePerSecond
Co-authored-by: zhihao.qu <zhihao.qu@ly.com>
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 630b1610b..d9928ed18 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -108,7 +108,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre nextReportTarget := reportInterval var modifiedTsNs int64 var sendErr error - if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool { + var ioBytePerSecond int64 + if req.IoBytePerSecond <= 0 { + ioBytePerSecond = vs.compactionBytePerSecond + } else { + ioBytePerSecond = req.IoBytePerSecond + } + throttler := util.NewWriteThrottler(ioBytePerSecond) + if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool { if processed > nextReportTarget { copyResponse.ProcessedBytes = processed if sendErr = stream.Send(copyResponse); sendErr != nil { @@ -117,7 +124,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre nextReportTarget = processed + reportInterval } return true - }); err != nil { + }, throttler); err != nil { return err } if sendErr != nil { @@ -127,14 +134,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) } - if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil { + if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil { return err } if modifiedTsNs > 0 { os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) } - if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil { + if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil { return err } if modifiedTsNs > 0 { @@ -184,6 +191,10 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre } func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { + return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond)) +} + +func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) { copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -198,7 +209,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn) + modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, progressFn) if err != nil { return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } @@ -207,7 +218,8 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i } -/** +/* +* only check the the differ of the file size todo: maybe should check the received count and deleted count of the volume */ |
