aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/volume_vacuum.go46
-rw-r--r--weed/util/throttler.go34
2 files changed, 47 insertions, 33 deletions
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index d8141cf71..bbe17071e 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -236,15 +236,13 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
type VolumeFileScanner4Vacuum struct {
- version needle.Version
- v *Volume
- dst *os.File
- nm *NeedleMap
- newOffset int64
- now uint64
- compactionBytePerSecond int64
- lastSizeCounter int64
- lastSizeCheckTime time.Time
+ version needle.Version
+ v *Volume
+ dst *os.File
+ nm *NeedleMap
+ newOffset int64
+ now uint64
+ writeThrottler *util.WriteThrottler
}
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
@@ -274,28 +272,11 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
}
delta := n.DiskSize(scanner.version)
scanner.newOffset += delta
- scanner.maybeSlowdown(delta)
+ scanner.writeThrottler.MaybeSlowdown(delta)
glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", scanner.newOffset, "data_size", n.Size)
}
return nil
}
-func (scanner *VolumeFileScanner4Vacuum) maybeSlowdown(delta int64) {
- if scanner.compactionBytePerSecond > 0 {
- scanner.lastSizeCounter += delta
- now := time.Now()
- elapsedDuration := now.Sub(scanner.lastSizeCheckTime)
- if elapsedDuration > 100*time.Millisecond {
- overLimitBytes := scanner.lastSizeCounter - scanner.compactionBytePerSecond/10
- if overLimitBytes > 0 {
- overRatio := float64(overLimitBytes) / float64(scanner.compactionBytePerSecond)
- sleepTime := time.Duration(overRatio*1000) * time.Millisecond
- // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", scanner.lastSizeCounter, scanner.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
- time.Sleep(sleepTime)
- }
- scanner.lastSizeCounter, scanner.lastSizeCheckTime = 0, time.Now()
- }
- }
-}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
@@ -312,12 +293,11 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
defer idx.Close()
scanner := &VolumeFileScanner4Vacuum{
- v: v,
- now: uint64(time.Now().Unix()),
- nm: NewBtreeNeedleMap(idx),
- dst: dst,
- compactionBytePerSecond: compactionBytePerSecond,
- lastSizeCheckTime: time.Now(),
+ v: v,
+ now: uint64(time.Now().Unix()),
+ nm: NewBtreeNeedleMap(idx),
+ dst: dst,
+ writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
return
diff --git a/weed/util/throttler.go b/weed/util/throttler.go
new file mode 100644
index 000000000..873161e37
--- /dev/null
+++ b/weed/util/throttler.go
@@ -0,0 +1,34 @@
+package util
+
+import "time"
+
+type WriteThrottler struct {
+ compactionBytePerSecond int64
+ lastSizeCounter int64
+ lastSizeCheckTime time.Time
+}
+
+func NewWriteThrottler(bytesPerSecond int64) *WriteThrottler {
+ return &WriteThrottler{
+ compactionBytePerSecond: bytesPerSecond,
+ lastSizeCheckTime: time.Now(),
+ }
+}
+
+func (wt *WriteThrottler) MaybeSlowdown(delta int64) {
+ if wt.compactionBytePerSecond > 0 {
+ wt.lastSizeCounter += delta
+ now := time.Now()
+ elapsedDuration := now.Sub(wt.lastSizeCheckTime)
+ if elapsedDuration > 100*time.Millisecond {
+ overLimitBytes := wt.lastSizeCounter - wt.compactionBytePerSecond/10
+ if overLimitBytes > 0 {
+ overRatio := float64(overLimitBytes) / float64(wt.compactionBytePerSecond)
+ sleepTime := time.Duration(overRatio*1000) * time.Millisecond
+ // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", wt.lastSizeCounter, wt.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
+ time.Sleep(sleepTime)
+ }
+ wt.lastSizeCounter, wt.lastSizeCheckTime = 0, time.Now()
+ }
+ }
+}