diff options
Diffstat (limited to 'weed/server/volume_grpc_vacuum.go')
| -rw-r--r-- | weed/server/volume_grpc_vacuum.go | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index f8d1b7fda..57bf8d867 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -24,19 +24,35 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve } -func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_server_pb.VacuumVolumeCompactRequest) (*volume_server_pb.VacuumVolumeCompactResponse, error) { +func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error { resp := &volume_server_pb.VacuumVolumeCompactResponse{} - - err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond) + reportInterval := int64(1024*1024*128) + nextReportTarget := reportInterval + + var sendErr error + err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool { + if processed > nextReportTarget { + resp.ProcessedBytes = processed + if sendErr = stream.Send(resp); sendErr != nil { + return false + } + nextReportTarget = processed + reportInterval + } + return true + }) if err != nil { glog.Errorf("compact volume %d: %v", req.VolumeId, err) - } else { - glog.V(1).Infof("compact volume %d", req.VolumeId) + return err + } + if sendErr != nil { + glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr) + return sendErr } - return resp, err + glog.V(1).Infof("compact volume %d", req.VolumeId) + return nil } |
