aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-11 10:32:17 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-11 10:32:17 -0700
commitd439d837726f5fdb53e686275a29eaa02d66ad62 (patch)
tree8743a11e9c52f671bf3998932e563710be8549c0
parent4237a813ccc92011d740c31c22e5bc29b7d3316e (diff)
downloadseaweedfs-d439d837726f5fdb53e686275a29eaa02d66ad62.tar.xz
seaweedfs-d439d837726f5fdb53e686275a29eaa02d66ad62.zip
volume: follow compactionBytePerSecond
related to https://github.com/chrislusf/seaweedfs/issues/1108
-rw-r--r--weed/command/backup.go2
-rw-r--r--weed/command/compact.go2
-rw-r--r--weed/storage/store_vacuum.go3
-rw-r--r--weed/storage/volume_vacuum.go12
-rw-r--r--weed/storage/volume_vacuum_test.go2
5 files changed, 12 insertions, 9 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index eb2b5ba4a..615be80cf 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool {
}
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
- if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil {
+ if err = v.Compact2(30*1024*1024*1024, 0); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true
}
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 85313b749..4e28aa725 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -50,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
- if err = v.Compact2(preallocate); err != nil {
+ if err = v.Compact2(preallocate, 0); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
}
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go
index e94d9b516..38159496e 100644
--- a/weed/storage/store_vacuum.go
+++ b/weed/storage/store_vacuum.go
@@ -16,8 +16,7 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
}
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
if v := s.findVolume(vid); v != nil {
- return v.Compact2(preallocate) // compactionBytePerSecond
- // return v.Compact(preallocate, compactionBytePerSecond)
+ return v.Compact2(preallocate, compactionBytePerSecond)
}
return fmt.Errorf("volume id %d is not found during compact", vid)
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 5d0d63877..669d5dd6c 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -57,7 +57,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
}
// compact a volume based on deletions in .idx files
-func (v *Volume) Compact2(preallocate int64) error {
+func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error {
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil
@@ -73,7 +73,7 @@ func (v *Volume) Compact2(preallocate int64) error {
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
- return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate)
+ return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
}
func (v *Volume) CommitCompact() error {
@@ -366,7 +366,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return
}
-func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) {
+func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
srcDatBackend, dstDatBackend backend.BackendStorageFile
dataFile *os.File
@@ -395,6 +395,8 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
dstDatBackend.WriteAt(sb.Bytes(), 0)
newOffset := int64(sb.BlockSize())
+ writeThrottler := util.NewWriteThrottler(compactionBytePerSecond)
+
oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
offset, size := value.Offset, value.Size
@@ -419,7 +421,9 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
- newOffset += n.DiskSize(version)
+ delta := n.DiskSize(version)
+ newOffset += delta
+ writeThrottler.MaybeSlowdown(delta)
glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
return nil
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 95f43d6ec..51f04c8b1 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -84,7 +84,7 @@ func TestCompaction(t *testing.T) {
}
startTime := time.Now()
- v.Compact2(0)
+ v.Compact2(0, 0)
speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
t.Logf("compaction speed: %.2f bytes/s", speed)