aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
authora <eddy@gfxlabs.io>2022-04-20 14:01:42 -0700
committera <eddy@gfxlabs.io>2022-04-20 14:01:42 -0700
commit1d6a9e66b608f77a0da9a6903802bb24ff0629d7 (patch)
tree7f3e02d6e69d10913d882c5f87d9156001e1b77c /weed/topology
parent846858fb436cc061c40c4f2565ed3682e3758596 (diff)
parentd1fd40358215a6237f51e0918659f74cc7269ff1 (diff)
downloadseaweedfs-1d6a9e66b608f77a0da9a6903802bb24ff0629d7.tar.xz
seaweedfs-1d6a9e66b608f77a0da9a6903802bb24ff0629d7.zip
Merge branch 'master' into a
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/topology_event_handling.go2
-rw-r--r--weed/topology/topology_vacuum.go14
-rw-r--r--weed/topology/volume_layout.go7
3 files changed, 18 insertions, 5 deletions
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 99acefaf5..fe3717233 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -25,7 +25,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {
- t.Vacuum(grpcDialOption, garbageThreshold, preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate)
}
}
}(garbageThreshold)
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 74d70bcdb..147220f4a 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -60,6 +60,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
}
return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
+
func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock()
@@ -116,6 +117,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
}
return isVacuumSuccess
}
+
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
@@ -144,6 +146,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
}
return isCommitSuccess
}
+
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
@@ -161,7 +164,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
-func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@@ -172,12 +175,19 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
// now only one vacuum process going on
- glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
+ glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d",
+ garbageThreshold, collection, volumeId)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
+ if collection != "" && collection != c.Name {
+ continue
+ }
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
+ if volumeId > 0 && volumeLayout.Lookup(needle.VolumeId(volumeId)) == nil {
+ continue
+ }
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
}
}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index de840f18f..167aee8ea 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -440,8 +440,11 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- glog.V(0).Infof("Volume %d reaches full capacity.", vid)
- return vl.removeFromWritable(vid)
+ wasWritable := vl.removeFromWritable(vid)
+ if wasWritable {
+ glog.V(0).Infof("Volume %d reaches full capacity.", vid)
+ }
+ return wasWritable
}
func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) {