diff options
Diffstat (limited to 'weed/storage/volume_vacuum.go')
| -rw-r--r-- | weed/storage/volume_vacuum.go | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 067456dfa..d686e2b09 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -17,6 +17,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +type ProgressFunc func(processed int64) bool + func (v *Volume) garbageLevel() float64 { if v.ContentSize() == 0 { return 0 @@ -62,7 +64,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error } // compact a volume based on deletions in .idx files -func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error { +func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error { if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil @@ -83,7 +85,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro if err := v.nm.Sync(); err != nil { glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) } - return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) + return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond, progressFn) } func (v *Volume) CommitCompact() error { @@ -382,7 +384,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return } -func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) { +func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) { var ( srcDatBackend, dstDatBackend backend.BackendStorageFile dataFile *os.File @@ -421,6 +423,12 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str return nil } + if progressFn != nil { + if !progressFn(offset.ToActualOffset()) { + return fmt.Errorf("interrupted") + } + } + n := new(needle.Needle) err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version) if err != nil { |
