aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-01-05 13:24:45 -0800
committerGitHub <noreply@github.com>2021-01-05 13:24:45 -0800
commitab57767d4baa87fd1d950afe837f61dfbbd008ac (patch)
tree2298c3dc95cb977cd46ef36ebabcab93b274f9cb
parent9e670c2e2302df31322d8afda7ef8512ca498eba (diff)
parentb2e50f602f23f919b6837b1207056cf1ebc7fc94 (diff)
downloadseaweedfs-ab57767d4baa87fd1d950afe837f61dfbbd008ac.tar.xz
seaweedfs-ab57767d4baa87fd1d950afe837f61dfbbd008ac.zip
Merge pull request #1730 from qieqieplus/rocksdb
impl: TTL per entry for rocksdb; fix package name
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go40
-rw-r--r--weed/filer/rocksdb/rocksdb_ttl.go41
2 files changed, 65 insertions, 16 deletions
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index ca6391386..9e058a96d 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -7,12 +7,14 @@ import (
"context"
"crypto/md5"
"fmt"
+ "io"
+
+ "github.com/tecbot/gorocksdb"
+
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
- rocksdb "github.com/tecbot/gorocksdb"
- "io"
)
func init() {
@@ -20,15 +22,15 @@ func init() {
}
type options struct {
- opt *rocksdb.Options
- ro *rocksdb.ReadOptions
- wo *rocksdb.WriteOptions
+ opt *gorocksdb.Options
+ ro *gorocksdb.ReadOptions
+ wo *gorocksdb.WriteOptions
}
func (opt *options) init() {
- opt.opt = rocksdb.NewDefaultOptions()
- opt.ro = rocksdb.NewDefaultReadOptions()
- opt.wo = rocksdb.NewDefaultWriteOptions()
+ opt.opt = gorocksdb.NewDefaultOptions()
+ opt.ro = gorocksdb.NewDefaultReadOptions()
+ opt.wo = gorocksdb.NewDefaultWriteOptions()
}
func (opt *options) close() {
@@ -39,7 +41,7 @@ func (opt *options) close() {
type RocksDBStore struct {
path string
- db *rocksdb.DB
+ db *gorocksdb.DB
options
}
@@ -59,7 +61,13 @@ func (store *RocksDBStore) initialize(dir string) (err error) {
}
store.options.init()
store.opt.SetCreateIfMissing(true)
- store.db, err = rocksdb.OpenDb(store.opt, dir)
+ // reduce write amplification
+ // also avoid expired data stored in highest level never get compacted
+ store.opt.SetLevelCompactionDynamicLevelBytes(true)
+ store.opt.SetCompactionFilter(NewTTLFilter())
+ // store.opt.SetMaxBackgroundCompactions(2)
+
+ store.db, err = gorocksdb.OpenDb(store.opt, dir)
return
}
@@ -116,7 +124,7 @@ func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data.Data()))
+ err = entry.DecodeAttributesAndChunks(data.Data())
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -141,10 +149,10 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
- batch := rocksdb.NewWriteBatch()
+ batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
- ro := rocksdb.NewDefaultReadOptions()
+ ro := gorocksdb.NewDefaultReadOptions()
defer ro.Destroy()
ro.SetFillCache(false)
@@ -167,7 +175,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil
}
-func enumerate(iter *rocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
+func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
if len(lastKey) == 0 {
iter.Seek(prefix)
@@ -231,7 +239,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
}
- ro := rocksdb.NewDefaultReadOptions()
+ ro := gorocksdb.NewDefaultReadOptions()
defer ro.Destroy()
ro.SetFillCache(false)
@@ -251,7 +259,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
- if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
return false
diff --git a/weed/filer/rocksdb/rocksdb_ttl.go b/weed/filer/rocksdb/rocksdb_ttl.go
new file mode 100644
index 000000000..98918b5d7
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_ttl.go
@@ -0,0 +1,41 @@
+//+build rocksdb
+
+package rocksdb
+
+import (
+ "time"
+
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+type TTLFilter struct {
+ skipLevel0 bool
+}
+
+func NewTTLFilter() gorocksdb.CompactionFilter {
+ return &TTLFilter{
+ skipLevel0: true,
+ }
+}
+
+func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []byte) {
+ // decode could be slow, causing write stall
+ // level >0 sst can run compaction in parallel
+ if t.skipLevel0 && level == 0 {
+ return false, val
+ }
+ entry := filer.Entry{}
+ if err := entry.DecodeAttributesAndChunks(val); err == nil {
+ if entry.TtlSec == 0 ||
+ entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).After(time.Now()) {
+ return false, val
+ }
+ }
+ return true, nil
+}
+
+func (t *TTLFilter) Name() string {
+ return "TTLFilter"
+}