aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/master.go27
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server.go12
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/topology/topology_event_handling.go4
-rw-r--r--weed/topology/topology_vacuum.go74
7 files changed, 94 insertions, 28 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index 914853d88..1a6db1945 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -36,14 +36,15 @@ var (
)
type MasterOptions struct {
- port *int
- portGrpc *int
- ip *string
- ipBind *string
- metaFolder *string
- peers *string
- volumeSizeLimitMB *uint
- volumePreallocate *bool
+ port *int
+ portGrpc *int
+ ip *string
+ ipBind *string
+ metaFolder *string
+ peers *string
+ volumeSizeLimitMB *uint
+ volumePreallocate *bool
+ maxParallelVacuumPerServer *int
// pulseSeconds *int
defaultReplication *string
garbageThreshold *float64
@@ -70,6 +71,7 @@ func init() {
m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
+ m.maxParallelVacuumPerServer = cmdMaster.Flag.Int("maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel per volume server")
// m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "", "Default replication type if not specified.")
m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
@@ -311,10 +313,11 @@ func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc)
return &weed_server.MasterOption{
- Master: masterAddress,
- MetaFolder: *m.metaFolder,
- VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB),
- VolumePreallocate: *m.volumePreallocate,
+ Master: masterAddress,
+ MetaFolder: *m.metaFolder,
+ VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB),
+ VolumePreallocate: *m.volumePreallocate,
+ MaxParallelVacuumPerServer: *m.maxParallelVacuumPerServer,
// PulseSeconds: *m.pulseSeconds,
DefaultReplicaPlacement: *m.defaultReplication,
GarbageThreshold: *m.garbageThreshold,
diff --git a/weed/command/server.go b/weed/command/server.go
index ddcaf1f7e..13d06a56b 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -92,6 +92,7 @@ func init() {
masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list")
masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
+ masterOptions.maxParallelVacuumPerServer = cmdServer.Flag.Int("master.maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel on one volume server")
masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "", "Default replication type if not specified.")
masterOptions.garbageThreshold = cmdServer.Flag.Float64("master.garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
masterOptions.metricsAddress = cmdServer.Flag.String("master.metrics.address", "", "Prometheus gateway address")
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 3cad627db..73f2b24cd 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -221,7 +221,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
resp := &master_pb.VacuumVolumeResponse{}
- ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize)
return resp, nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index ee28f3386..aefae7126 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -29,8 +29,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/shell"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
)
const (
@@ -39,10 +39,11 @@ const (
)
type MasterOption struct {
- Master pb.ServerAddress
- MetaFolder string
- VolumeSizeLimitMB uint32
- VolumePreallocate bool
+ Master pb.ServerAddress
+ MetaFolder string
+ VolumeSizeLimitMB uint32
+ VolumePreallocate bool
+ MaxParallelVacuumPerServer int
// PulseSeconds int
DefaultReplicaPlacement string
GarbageThreshold float64
@@ -158,6 +159,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
ms.Topo.StartRefreshWritableVolumes(
ms.grpcDialOption,
ms.option.GarbageThreshold,
+ ms.option.MaxParallelVacuumPerServer,
topology.VolumeGrowStrategy.Threshold,
ms.preallocateSize,
)
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index e003d7d9f..47bc1918a 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -66,7 +66,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
// glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.option.MaxParallelVacuumPerServer, 0, "", ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index d0ecd089a..60459d7b7 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -13,7 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage"
)
-func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, growThreshold float64, preallocate int64) {
+func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, concurrentVacuumLimitPerVolumeServer int, growThreshold float64, preallocate int64) {
go func() {
for {
if t.IsLeader() {
@@ -27,7 +27,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
for {
if t.IsLeader() {
if !t.isDisableVacuum {
- t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate)
}
} else {
stats.MasterReplicaPlacementMismatch.Reset()
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index b1a8b66d4..eb2ca8c3b 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,7 +2,9 @@ package topology
import (
"context"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "sync"
"sync/atomic"
"time"
@@ -213,7 +215,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
-func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@@ -243,25 +245,83 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
}
} else {
- t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
+ t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate)
}
}
}
}
}
-func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
+func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) {
volumeLayout.accessLock.RLock()
- tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
+ todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
- tmpMap[vid] = locationList.Copy()
+ todoVolumeMap[vid] = locationList.Copy()
}
volumeLayout.accessLock.RUnlock()
- for vid, locationList := range tmpMap {
- t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
+ // limiter for each volume server
+ limiter := make(map[NodeId]int)
+ var limiterLock sync.Mutex
+ for _, locationList := range todoVolumeMap {
+ for _, dn := range locationList.list {
+ if _, ok := limiter[dn.Id()]; !ok {
+ limiter[dn.Id()] = maxParallelVacuumPerServer
+ }
+ }
}
+
+ executor := util.NewLimitedConcurrentExecutor(100)
+
+ var wg sync.WaitGroup
+
+ for len(todoVolumeMap) > 0 {
+ pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
+ for vid, locationList := range todoVolumeMap {
+ hasEnoughQuota := true
+ for _, dn := range locationList.list {
+ limiterLock.Lock()
+ quota := limiter[dn.Id()]
+ limiterLock.Unlock()
+ if quota <= 0 {
+ hasEnoughQuota = false
+ break
+ }
+ }
+ if !hasEnoughQuota {
+ pendingVolumeMap[vid] = locationList
+ continue
+ }
+
+ // debit the quota
+ for _, dn := range locationList.list {
+ limiterLock.Lock()
+ limiter[dn.Id()]--
+ limiterLock.Unlock()
+ }
+
+ wg.Add(1)
+ executor.Execute(func() {
+ defer wg.Done()
+ t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
+ // credit the quota
+ for _, dn := range locationList.list {
+ limiterLock.Lock()
+ limiter[dn.Id()]++
+ limiterLock.Unlock()
+ }
+ })
+ }
+
+ if len(todoVolumeMap) == len(pendingVolumeMap) {
+ time.Sleep(10 * time.Second)
+ }
+ todoVolumeMap = pendingVolumeMap
+ }
+
+ wg.Wait()
+
}
func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) {