aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_vacuum.go')
-rw-r--r--weed/storage/volume_vacuum.go14
1 files changed, 11 insertions, 3 deletions
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 067456dfa..d686e2b09 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -17,6 +17,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+type ProgressFunc func(processed int64) bool
+
func (v *Volume) garbageLevel() float64 {
if v.ContentSize() == 0 {
return 0
@@ -62,7 +64,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
}
// compact a volume based on deletions in .idx files
-func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error {
+func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error {
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil
@@ -83,7 +85,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
}
- return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
+ return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond, progressFn)
}
func (v *Volume) CommitCompact() error {
@@ -382,7 +384,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return
}
-func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) {
+func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) {
var (
srcDatBackend, dstDatBackend backend.BackendStorageFile
dataFile *os.File
@@ -421,6 +423,12 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
return nil
}
+ if progressFn != nil {
+ if !progressFn(offset.ToActualOffset()) {
+ return fmt.Errorf("interrupted")
+ }
+ }
+
n := new(needle.Needle)
err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version)
if err != nil {