diff options
Diffstat (limited to 'weed/topology/topology_vacuum.go')
| -rw-r--r-- | weed/topology/topology_vacuum.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 9feb55b73..03340c17f 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,6 +2,7 @@ package topology import ( "context" + "github.com/chrislusf/seaweedfs/weed/pb" "sync/atomic" "time" @@ -19,7 +20,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne ch := make(chan int, locationlist.Length()) errCount := int32(0) for index, dn := range locationlist.list { - go func(index int, url string, vid needle.VolumeId) { + go func(index int, url pb.ServerAddress, vid needle.VolumeId) { err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), @@ -39,7 +40,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne if err != nil { glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err) } - }(index, dn.Url(), vid) + }(index, dn.ServerAddress(), vid) } vacuumLocationList := NewVolumeLocationList() @@ -66,7 +67,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid needle.VolumeId) { + go func(index int, url pb.ServerAddress, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ @@ -82,7 +83,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * glog.V(0).Infof("Complete vacuuming %d on %s", vid, url) ch <- true } - }(index, dn.Url(), vid) + }(index, dn.ServerAddress(), vid) } isVacuumSuccess := true @@ -104,7 +105,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V isReadOnly := false for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) @@ -130,7 +131,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: uint32(vid), }) |
