diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/store_vacuum.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 14 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum_test.go | 2 |
3 files changed, 14 insertions, 6 deletions
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index fe2033070..0d6e0b0f1 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -15,13 +15,13 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId) } -func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { +func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error { if v := s.findVolume(vid); v != nil { s := stats.NewDiskStatus(v.dir) if int64(s.Free) < preallocate { return fmt.Errorf("free space: %d bytes, not enough for %d bytes", s.Free, preallocate) } - return v.Compact2(preallocate, compactionBytePerSecond) + return v.Compact2(preallocate, compactionBytePerSecond, progressFn) } return fmt.Errorf("volume id %d is not found during compact", vid) } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 067456dfa..d686e2b09 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -17,6 +17,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +type ProgressFunc func(processed int64) bool + func (v *Volume) garbageLevel() float64 { if v.ContentSize() == 0 { return 0 @@ -62,7 +64,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error } // compact a volume based on deletions in .idx files -func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error { +func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error { if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil @@ -83,7 +85,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro if err := v.nm.Sync(); err != nil { glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) } - return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) + return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond, progressFn) } func (v *Volume) CommitCompact() error { @@ -382,7 +384,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return } -func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) { +func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) { var ( srcDatBackend, dstDatBackend backend.BackendStorageFile dataFile *os.File @@ -421,6 +423,12 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str return nil } + if progressFn != nil { + if !progressFn(offset.ToActualOffset()) { + return fmt.Errorf("interrupted") + } + } + n := new(needle.Needle) err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version) if err != nil { diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 64f4b3b60..0177cb64d 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -83,7 +83,7 @@ func TestCompaction(t *testing.T) { } startTime := time.Now() - v.Compact2(0, 0) + v.Compact2(0, 0, nil) speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() t.Logf("compaction speed: %.2f bytes/s", speed) |
