aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordsd <60881537+dsd2077@users.noreply.github.com>2024-12-19 03:56:40 +0800
committerGitHub <noreply@github.com>2024-12-18 11:56:40 -0800
commit72af97162fa4b76557c9160a3809b0a9dd67cbbd (patch)
tree7eee795f9de27a2557907ddbd81893f154902d77
parent9fbc4ea417d7af358270a53fdd4f7f37a37d82d0 (diff)
downloadseaweedfs-72af97162fa4b76557c9160a3809b0a9dd67cbbd.tar.xz
seaweedfs-72af97162fa4b76557c9160a3809b0a9dd67cbbd.zip
[shell] feat:stop vacuum immediately once volume.vacuum.disable was executed (#6375)
stop vacuum immediately once volume.vacuum.disable was executed Co-authored-by: dsd <dsd2019@foxmail.com>
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server_handlers_admin.go7
-rw-r--r--weed/topology/topology_event_handling.go2
-rw-r--r--weed/topology/topology_vacuum.go24
4 files changed, 25 insertions, 10 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 24e9c058c..3a383e259 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -271,7 +271,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
resp := &master_pb.VacuumVolumeResponse{}
- ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize, false)
return resp, nil
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 452a4194e..fb0503e33 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -3,12 +3,13 @@ package weed_server
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"math/rand/v2"
"net/http"
"strconv"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@@ -66,7 +67,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
// glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.option.MaxParallelVacuumPerServer, 0, "", ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.option.MaxParallelVacuumPerServer, 0, "", ms.preallocateSize, false)
ms.dirStatusHandler(w, r)
}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 109f29ee0..e3ad8f2dc 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -27,7 +27,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
for {
if t.IsLeader() {
if !t.isDisableVacuum {
- t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate, true)
}
} else {
stats.MasterReplicaPlacementMismatch.Reset()
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index eb2ca8c3b..83be65d7c 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,12 +2,13 @@ package topology
import (
"context"
- "github.com/seaweedfs/seaweedfs/weed/util"
"io"
"sync"
"sync/atomic"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
@@ -215,11 +216,12 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
-func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64, automatic bool) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
if !swapped {
+ glog.V(0).Infof("Vacuum is already running")
return
}
defer atomic.StoreInt64(&t.vacuumLockCounter, 0)
@@ -245,14 +247,21 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
}
} else {
- t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate)
+ t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate, automatic)
}
}
+ if automatic && t.isDisableVacuum {
+ break
+ }
+ }
+ if automatic && t.isDisableVacuum {
+ glog.V(0).Infof("Vacuum is disabled")
+ break
}
}
}
-func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) {
+func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64, automatic bool) {
volumeLayout.accessLock.RLock()
todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
@@ -312,8 +321,13 @@ func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeL
limiterLock.Unlock()
}
})
+ if automatic && t.isDisableVacuum {
+ break
+ }
+ }
+ if automatic && t.isDisableVacuum {
+ break
}
-
if len(todoVolumeMap) == len(pendingVolumeMap) {
time.Sleep(10 * time.Second)
}