aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/topology_vacuum.go')
-rw-r--r--weed/topology/topology_vacuum.go74
1 files changed, 67 insertions, 7 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index b1a8b66d4..eb2ca8c3b 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,7 +2,9 @@ package topology
import (
"context"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "sync"
"sync/atomic"
"time"
@@ -213,7 +215,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
-func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@@ -243,25 +245,83 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
}
} else {
- t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
+ t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate)
}
}
}
}
}
-func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
+func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) {
volumeLayout.accessLock.RLock()
- tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
+ todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
- tmpMap[vid] = locationList.Copy()
+ todoVolumeMap[vid] = locationList.Copy()
}
volumeLayout.accessLock.RUnlock()
- for vid, locationList := range tmpMap {
- t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
+ // limiter for each volume server
+ limiter := make(map[NodeId]int)
+ var limiterLock sync.Mutex
+ for _, locationList := range todoVolumeMap {
+ for _, dn := range locationList.list {
+ if _, ok := limiter[dn.Id()]; !ok {
+ limiter[dn.Id()] = maxParallelVacuumPerServer
+ }
+ }
}
+
+ executor := util.NewLimitedConcurrentExecutor(100)
+
+ var wg sync.WaitGroup
+
+ for len(todoVolumeMap) > 0 {
+ pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
+ for vid, locationList := range todoVolumeMap {
+ hasEnoughQuota := true
+ for _, dn := range locationList.list {
+ limiterLock.Lock()
+ quota := limiter[dn.Id()]
+ limiterLock.Unlock()
+ if quota <= 0 {
+ hasEnoughQuota = false
+ break
+ }
+ }
+ if !hasEnoughQuota {
+ pendingVolumeMap[vid] = locationList
+ continue
+ }
+
+ // debit the quota
+ for _, dn := range locationList.list {
+ limiterLock.Lock()
+ limiter[dn.Id()]--
+ limiterLock.Unlock()
+ }
+
+ wg.Add(1)
+ executor.Execute(func() {
+ defer wg.Done()
+ t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
+ // credit the quota
+ for _, dn := range locationList.list {
+ limiterLock.Lock()
+ limiter[dn.Id()]++
+ limiterLock.Unlock()
+ }
+ })
+ }
+
+ if len(todoVolumeMap) == len(pendingVolumeMap) {
+ time.Sleep(10 * time.Second)
+ }
+ todoVolumeMap = pendingVolumeMap
+ }
+
+ wg.Wait()
+
}
func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) {