aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_vacuum.go')
-rw-r--r--weed/server/volume_grpc_vacuum.go28
1 files changed, 22 insertions, 6 deletions
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index f8d1b7fda..57bf8d867 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -24,19 +24,35 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
}
-func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_server_pb.VacuumVolumeCompactRequest) (*volume_server_pb.VacuumVolumeCompactResponse, error) {
+func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
-
- err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond)
+ reportInterval := int64(1024*1024*128)
+ nextReportTarget := reportInterval
+
+ var sendErr error
+ err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool {
+ if processed > nextReportTarget {
+ resp.ProcessedBytes = processed
+ if sendErr = stream.Send(resp); sendErr != nil {
+ return false
+ }
+ nextReportTarget = processed + reportInterval
+ }
+ return true
+ })
if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
- } else {
- glog.V(1).Infof("compact volume %d", req.VolumeId)
+ return err
+ }
+ if sendErr != nil {
+ glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr)
+ return sendErr
}
- return resp, err
+ glog.V(1).Infof("compact volume %d", req.VolumeId)
+ return nil
}