aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-24 01:55:34 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-24 01:55:34 -0700
commit3be3c17f59bc409b15b0dcb687e14c4af10f94c4 (patch)
tree3458f283136b9045f00c91744ffc5953221fc022 /weed/server
parent07dd4873db996c4f4a94ee3b0975052d0ec7595b (diff)
downloadseaweedfs-3be3c17f59bc409b15b0dcb687e14c4af10f94c4.tar.xz
seaweedfs-3be3c17f59bc409b15b0dcb687e14c4af10f94c4.zip
volume vacuum: avoid timeout with streaming progress report
fix https://github.com/chrislusf/seaweedfs/issues/2396
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/volume_grpc_vacuum.go28
1 files changed, 22 insertions, 6 deletions
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index f8d1b7fda..57bf8d867 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -24,19 +24,35 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
}
-func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_server_pb.VacuumVolumeCompactRequest) (*volume_server_pb.VacuumVolumeCompactResponse, error) {
+func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
-
- err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond)
+ reportInterval := int64(1024*1024*128)
+ nextReportTarget := reportInterval
+
+ var sendErr error
+ err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool {
+ if processed > nextReportTarget {
+ resp.ProcessedBytes = processed
+ if sendErr = stream.Send(resp); sendErr != nil {
+ return false
+ }
+ nextReportTarget = processed + reportInterval
+ }
+ return true
+ })
if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
- } else {
- glog.V(1).Infof("compact volume %d", req.VolumeId)
+ return err
+ }
+ if sendErr != nil {
+ glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr)
+ return sendErr
}
- return resp, err
+ glog.V(1).Infof("compact volume %d", req.VolumeId)
+ return nil
}