aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/topology_vacuum.go')
-rw-r--r--weed/topology/topology_vacuum.go18
1 files changed, 14 insertions, 4 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 74d70bcdb..7e030e5ac 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -60,6 +60,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
}
return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
+
func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock()
@@ -116,6 +117,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
}
return isVacuumSuccess
}
+
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
@@ -144,6 +146,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
}
return isCommitSuccess
}
+
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
@@ -161,7 +164,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
-func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@@ -172,23 +175,30 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
// now only one vacuum process going on
- glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
+ glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d",
+ garbageThreshold, collection, volumeId)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
+ if collection != "" && collection != c.Name {
+ continue
+ }
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
- t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
+ t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, volumeId, preallocate)
}
}
}
}
-func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
+func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, volumeId uint32, preallocate int64) {
volumeLayout.accessLock.RLock()
tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
+ if volumeId > 0 && volumeId != uint32(vid) {
+ continue
+ }
tmpMap[vid] = locationList.Copy()
}
volumeLayout.accessLock.RUnlock()