diff options
| author | zhangsong <songz.qqmail@qq.com> | 2019-12-01 23:29:41 +0800 |
|---|---|---|
| committer | zhangsong <zhangsong@cloudwalk.cn> | 2019-12-02 13:25:32 +0800 |
| commit | e83c36e26f9c79e4fd289e1cd531c505fc8819fc (patch) | |
| tree | 87e09936165640907c25243a074db0a4b93321e6 /weed | |
| parent | 0da7b894ccee449eed942b9ac6e5dbf775ca1d21 (diff) | |
| download | seaweedfs-e83c36e26f9c79e4fd289e1cd531c505fc8819fc.tar.xz seaweedfs-e83c36e26f9c79e4fd289e1cd531c505fc8819fc.zip | |
fix the bug of volume never be vacuumed
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/topology/topology_vacuum.go | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index ff32f1874..ca626e973 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -13,8 +13,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { - ch := make(chan bool, locationlist.Length()) +func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, + locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) { + ch := make(chan int, locationlist.Length()) + errCount := int32(0) for index, dn := range locationlist.list { go func(index int, url string, vid needle.VolumeId) { err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -22,11 +24,15 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi VolumeId: uint32(vid), }) if err != nil { - ch <- false + atomic.AddInt32(&errCount, 1) + ch <- -1 return err } - isNeeded := resp.GarbageRatio > garbageThreshold - ch <- isNeeded + if resp.GarbageRatio >= garbageThreshold { + ch <- index + } else { + ch <- -1 + } return nil }) if err != nil { @@ -34,18 +40,21 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi } }(index, dn.Url(), vid) } - isCheckSuccess := true + vacuumLocationList := NewVolumeLocationList() for range locationlist.list { select { - case canVacuum := <-ch: - isCheckSuccess = isCheckSuccess && canVacuum + case index := <-ch: + if index != -1 { + vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index]) + } case <-time.After(30 * time.Minute): - return false + return vacuumLocationList, false } } - return isCheckSuccess + return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0 } -func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { +func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, + locationlist *VolumeLocationList, preallocate int64) bool { vl.accessLock.Lock() vl.removeFromWritable(vid) vl.accessLock.Unlock() @@ -163,11 +172,12 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL } glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) - if batchVacuumVolumeCheck(grpcDialOption, volumeLayout, vid, locationList, garbageThreshold) { - if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, locationList, preallocate) { - batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, locationList) + if vacuumLocationList, needVacuum := batchVacuumVolumeCheck( + grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum { + if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) { + batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList) } else { - batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, locationList) + batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList) } } } |
