diff options
Diffstat (limited to 'weed/topology/topology_vacuum.go')
| -rw-r--r-- | weed/topology/topology_vacuum.go | 74 |
1 files changed, 67 insertions, 7 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index b1a8b66d4..eb2ca8c3b 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,7 +2,9 @@ package topology import ( "context" + "github.com/seaweedfs/seaweedfs/weed/util" "io" + "sync" "sync/atomic" "time" @@ -213,7 +215,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl * } } -func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) { // if there is vacuum going on, return immediately swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) @@ -243,25 +245,83 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) } } else { - t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) + t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate) } } } } } -func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { +func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) { volumeLayout.accessLock.RLock() - tmpMap := make(map[needle.VolumeId]*VolumeLocationList) + todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { - tmpMap[vid] = locationList.Copy() + todoVolumeMap[vid] = locationList.Copy() } volumeLayout.accessLock.RUnlock() - for vid, locationList := range tmpMap { - t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) + // limiter for each volume server + limiter := make(map[NodeId]int) + var limiterLock sync.Mutex + for _, locationList := range todoVolumeMap { + for _, dn := range locationList.list { + if _, ok := limiter[dn.Id()]; !ok { + limiter[dn.Id()] = maxParallelVacuumPerServer + } + } } + + executor := util.NewLimitedConcurrentExecutor(100) + + var wg sync.WaitGroup + + for len(todoVolumeMap) > 0 { + pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList) + for vid, locationList := range todoVolumeMap { + hasEnoughQuota := true + for _, dn := range locationList.list { + limiterLock.Lock() + quota := limiter[dn.Id()] + limiterLock.Unlock() + if quota <= 0 { + hasEnoughQuota = false + break + } + } + if !hasEnoughQuota { + pendingVolumeMap[vid] = locationList + continue + } + + // debit the quota + for _, dn := range locationList.list { + limiterLock.Lock() + limiter[dn.Id()]-- + limiterLock.Unlock() + } + + wg.Add(1) + executor.Execute(func() { + defer wg.Done() + t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) + // credit the quota + for _, dn := range locationList.list { + limiterLock.Lock() + limiter[dn.Id()]++ + limiterLock.Unlock() + } + }) + } + + if len(todoVolumeMap) == len(pendingVolumeMap) { + time.Sleep(10 * time.Second) + } + todoVolumeMap = pendingVolumeMap + } + + wg.Wait() + } func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) { |
