aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-05-03 17:22:39 -0700
committerChris Lu <chris.lu@gmail.com>2019-05-03 17:22:39 -0700
commitb335f81a4fb51e57b0da37b2b662e823369459f6 (patch)
tree7eeb6af906d81bf1a67aae1b9f627464f355908c
parentf0f981e7c8bd74f3df277539407bcb6b6a483050 (diff)
downloadseaweedfs-b335f81a4fb51e57b0da37b2b662e823369459f6.tar.xz
seaweedfs-b335f81a4fb51e57b0da37b2b662e823369459f6.zip
volume: add option to limit compaction speed
-rw-r--r--weed/command/backup.go2
-rw-r--r--weed/command/compact.go2
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/volume.go3
-rw-r--r--weed/server/volume_grpc_vacuum.go2
-rw-r--r--weed/server/volume_server.go26
-rw-r--r--weed/storage/store_vacuum.go4
-rw-r--r--weed/storage/volume_vacuum.go52
-rw-r--r--weed/storage/volume_vacuum_test.go10
9 files changed, 69 insertions, 33 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 51a89a5af..022e784c7 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -91,7 +91,7 @@ func runBackup(cmd *Command, args []string) bool {
}
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
- if err = v.Compact(0); err != nil {
+ if err = v.Compact(0, 0); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true
}
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 3ac09259e..79d50c095 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -43,7 +43,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
if *compactMethod == 0 {
- if err = v.Compact(preallocate); err != nil {
+ if err = v.Compact(preallocate, 0); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
diff --git a/weed/command/server.go b/weed/command/server.go
index 228594ad0..ce402f1cd 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -94,6 +94,7 @@ func init() {
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
+ serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index b87555456..4d34fbc1e 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -43,6 +43,7 @@ type VolumeServerOptions struct {
readRedirect *bool
cpuProfile *string
memProfile *string
+ compactionMBPerSecond *int
}
func init() {
@@ -63,6 +64,7 @@ func init() {
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
+ v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit compaction speed in mega bytes per second")
}
var cmdVolume = &Command{
@@ -157,6 +159,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation, *v.readRedirect,
+ *v.compactionMBPerSecond,
)
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index 4aa6588cb..24f982241 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -28,7 +28,7 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
- err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate)
+ err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond)
if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 8e77ec570..be1b433f7 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -20,9 +20,10 @@ type VolumeServer struct {
guard *security.Guard
grpcDialOption grpc.DialOption
- needleMapKind storage.NeedleMapType
- FixJpgOrientation bool
- ReadRedirect bool
+ needleMapKind storage.NeedleMapType
+ FixJpgOrientation bool
+ ReadRedirect bool
+ compactionBytePerSecond int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -33,20 +34,23 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
- readRedirect bool) *VolumeServer {
+ readRedirect bool,
+ compactionMBPerSecond int,
+) *VolumeServer {
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
enableUiAccess := v.GetBool("access.ui")
vs := &VolumeServer{
- pulseSeconds: pulseSeconds,
- dataCenter: dataCenter,
- rack: rack,
- needleMapKind: needleMapKind,
- FixJpgOrientation: fixJpgOrientation,
- ReadRedirect: readRedirect,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
+ pulseSeconds: pulseSeconds,
+ dataCenter: dataCenter,
+ rack: rack,
+ needleMapKind: needleMapKind,
+ FixJpgOrientation: fixJpgOrientation,
+ ReadRedirect: readRedirect,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
+ compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
}
vs.MasterNodes = masterNodes
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go
index 5f982a8c3..b1f1a6277 100644
--- a/weed/storage/store_vacuum.go
+++ b/weed/storage/store_vacuum.go
@@ -14,9 +14,9 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
}
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
}
-func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64) error {
+func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
if v := s.findVolume(vid); v != nil {
- return v.Compact(preallocate)
+ return v.Compact(preallocate, compactionBytePerSecond)
}
return fmt.Errorf("volume id %d is not found during compact", vid)
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index e31908764..d8141cf71 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -18,7 +18,7 @@ func (v *Volume) garbageLevel() float64 {
return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
}
-func (v *Volume) Compact(preallocate int64) error {
+func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
glog.V(3).Infof("Compacting volume %d ...", v.Id)
//no need to lock for copy on write
//v.accessLock.Lock()
@@ -29,7 +29,7 @@ func (v *Volume) Compact(preallocate int64) error {
v.lastCompactIndexOffset = v.nm.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
- return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate)
+ return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
}
func (v *Volume) Compact2() error {
@@ -236,12 +236,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
type VolumeFileScanner4Vacuum struct {
- version needle.Version
- v *Volume
- dst *os.File
- nm *NeedleMap
- newOffset int64
- now uint64
+ version needle.Version
+ v *Volume
+ dst *os.File
+ nm *NeedleMap
+ newOffset int64
+ now uint64
+ compactionBytePerSecond int64
+ lastSizeCounter int64
+ lastSizeCheckTime time.Time
}
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
@@ -269,13 +272,32 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
- scanner.newOffset += n.DiskSize(scanner.version)
+ delta := n.DiskSize(scanner.version)
+ scanner.newOffset += delta
+ scanner.maybeSlowdown(delta)
glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", scanner.newOffset, "data_size", n.Size)
}
return nil
}
+func (scanner *VolumeFileScanner4Vacuum) maybeSlowdown(delta int64) {
+ if scanner.compactionBytePerSecond > 0 {
+ scanner.lastSizeCounter += delta
+ now := time.Now()
+ elapsedDuration := now.Sub(scanner.lastSizeCheckTime)
+ if elapsedDuration > 100*time.Millisecond {
+ overLimitBytes := scanner.lastSizeCounter - scanner.compactionBytePerSecond/10
+ if overLimitBytes > 0 {
+ overRatio := float64(overLimitBytes) / float64(scanner.compactionBytePerSecond)
+ sleepTime := time.Duration(overRatio*1000) * time.Millisecond
+ // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", scanner.lastSizeCounter, scanner.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
+ time.Sleep(sleepTime)
+ }
+ scanner.lastSizeCounter, scanner.lastSizeCheckTime = 0, time.Now()
+ }
+ }
+}
-func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64) (err error) {
+func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
dst, idx *os.File
)
@@ -290,10 +312,12 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
defer idx.Close()
scanner := &VolumeFileScanner4Vacuum{
- v: v,
- now: uint64(time.Now().Unix()),
- nm: NewBtreeNeedleMap(idx),
- dst: dst,
+ v: v,
+ now: uint64(time.Now().Unix()),
+ nm: NewBtreeNeedleMap(idx),
+ dst: dst,
+ compactionBytePerSecond: compactionBytePerSecond,
+ lastSizeCheckTime: time.Now(),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
return
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index d038eeda3..54899c788 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"testing"
+ "time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -72,8 +73,8 @@ func TestCompaction(t *testing.T) {
t.Fatalf("volume creation: %v", err)
}
- beforeCommitFileCount := 1000
- afterCommitFileCount := 1000
+ beforeCommitFileCount := 10000
+ afterCommitFileCount := 10000
infos := make([]*needleInfo, beforeCommitFileCount+afterCommitFileCount)
@@ -81,7 +82,10 @@ func TestCompaction(t *testing.T) {
doSomeWritesDeletes(i, v, t, infos)
}
- v.Compact(0)
+ startTime := time.Now()
+ v.Compact(0, 1024*1024)
+ speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
+ t.Logf("compaction speed: %.2f bytes/s", speed)
for i := 1; i <= afterCommitFileCount; i++ {
doSomeWritesDeletes(i+beforeCommitFileCount, v, t, infos)