aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2019-12-01 23:15:01 -0800
committerGitHub <noreply@github.com>2019-12-01 23:15:01 -0800
commit72482b8e1c0713ec2fe75943ed3244d62350d6de (patch)
tree87e09936165640907c25243a074db0a4b93321e6
parent0da7b894ccee449eed942b9ac6e5dbf775ca1d21 (diff)
parente83c36e26f9c79e4fd289e1cd531c505fc8819fc (diff)
downloadseaweedfs-72482b8e1c0713ec2fe75943ed3244d62350d6de.tar.xz
seaweedfs-72482b8e1c0713ec2fe75943ed3244d62350d6de.zip
Merge pull request #1146 from song-zhang/fix-volume-not-vacuum-bug
fix the bug of volume never be vacuumed
-rw-r--r--weed/topology/topology_vacuum.go40
1 files changed, 25 insertions, 15 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index ff32f1874..ca626e973 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -13,8 +13,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
-func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
- ch := make(chan bool, locationlist.Length())
+func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
+ locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) {
+ ch := make(chan int, locationlist.Length())
+ errCount := int32(0)
for index, dn := range locationlist.list {
go func(index int, url string, vid needle.VolumeId) {
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -22,11 +24,15 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
VolumeId: uint32(vid),
})
if err != nil {
- ch <- false
+ atomic.AddInt32(&errCount, 1)
+ ch <- -1
return err
}
- isNeeded := resp.GarbageRatio > garbageThreshold
- ch <- isNeeded
+ if resp.GarbageRatio >= garbageThreshold {
+ ch <- index
+ } else {
+ ch <- -1
+ }
return nil
})
if err != nil {
@@ -34,18 +40,21 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
}
}(index, dn.Url(), vid)
}
- isCheckSuccess := true
+ vacuumLocationList := NewVolumeLocationList()
for range locationlist.list {
select {
- case canVacuum := <-ch:
- isCheckSuccess = isCheckSuccess && canVacuum
+ case index := <-ch:
+ if index != -1 {
+ vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index])
+ }
case <-time.After(30 * time.Minute):
- return false
+ return vacuumLocationList, false
}
}
- return isCheckSuccess
+ return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
-func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
+func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
+ locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock()
vl.removeFromWritable(vid)
vl.accessLock.Unlock()
@@ -163,11 +172,12 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL
}
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
- if batchVacuumVolumeCheck(grpcDialOption, volumeLayout, vid, locationList, garbageThreshold) {
- if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, locationList, preallocate) {
- batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, locationList)
+ if vacuumLocationList, needVacuum := batchVacuumVolumeCheck(
+ grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum {
+ if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
+ batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} else {
- batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, locationList)
+ batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
}
}
}