diff options
Diffstat (limited to 'weed/topology/topology_vacuum.go')
| -rw-r--r-- | weed/topology/topology_vacuum.go | 24 |
1 files changed, 19 insertions, 5 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index eb2ca8c3b..83be65d7c 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,12 +2,13 @@ package topology import ( "context" - "github.com/seaweedfs/seaweedfs/weed/util" "io" "sync" "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/pb" "google.golang.org/grpc" @@ -215,11 +216,12 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl * } } -func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64, automatic bool) { // if there is vacuum going on, return immediately swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) if !swapped { + glog.V(0).Infof("Vacuum is already running") return } defer atomic.StoreInt64(&t.vacuumLockCounter, 0) @@ -245,14 +247,21 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) } } else { - t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate) + t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate, automatic) } } + if automatic && t.isDisableVacuum { + break + } + } + if automatic && t.isDisableVacuum { + glog.V(0).Infof("Vacuum is disabled") + break } } } -func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) { +func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64, automatic bool) { volumeLayout.accessLock.RLock() todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList) @@ -312,8 +321,13 @@ func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeL limiterLock.Unlock() } }) + if automatic && t.isDisableVacuum { + break + } + } + if automatic && t.isDisableVacuum { + break } - if len(todoVolumeMap) == len(pendingVolumeMap) { time.Sleep(10 * time.Second) } |
