diff options
Diffstat (limited to 'weed/topology/topology_vacuum.go')
| -rw-r--r-- | weed/topology/topology_vacuum.go | 20 |
1 files changed, 18 insertions, 2 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 03340c17f..40e3f70e7 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -3,6 +3,7 @@ package topology import ( "context" "github.com/chrislusf/seaweedfs/weed/pb" + "io" "sync/atomic" "time" @@ -70,11 +71,26 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * go func(index int, url pb.ServerAddress, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: uint32(vid), Preallocate: preallocate, }) - return err + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + glog.V(0).Infof("%d vacuum %d on %s processed %d bytes", index, vid, url, resp.ProcessedBytes) + } + return nil }) if err != nil { glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err) |
