aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/topology_vacuum.go')
-rw-r--r--weed/topology/topology_vacuum.go20
1 files changed, 18 insertions, 2 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 03340c17f..40e3f70e7 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -3,6 +3,7 @@ package topology
import (
"context"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "io"
"sync/atomic"
"time"
@@ -70,11 +71,26 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
+ stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: uint32(vid),
Preallocate: preallocate,
})
- return err
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ glog.V(0).Infof("%d vacuum %d on %s processed %d bytes", index, vid, url, resp.ProcessedBytes)
+ }
+ return nil
})
if err != nil {
glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)