aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/rocksdb
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/rocksdb')
-rw-r--r--weed/filer/rocksdb/README.md41
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go302
-rw-r--r--weed/filer/rocksdb/rocksdb_store_kv.go47
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go117
-rw-r--r--weed/filer/rocksdb/rocksdb_ttl.go40
5 files changed, 547 insertions, 0 deletions
diff --git a/weed/filer/rocksdb/README.md b/weed/filer/rocksdb/README.md
new file mode 100644
index 000000000..6bae6d34e
--- /dev/null
+++ b/weed/filer/rocksdb/README.md
@@ -0,0 +1,41 @@
+# Prepare the compilation environment on linux
+- sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
+- sudo apt-get update -qq
+- sudo apt-get install gcc-6 g++-6 libsnappy-dev zlib1g-dev libbz2-dev -qq
+- export CXX="g++-6" CC="gcc-6"
+
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags2_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags2_2.0-1.1ubuntu1_amd64.deb
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+
+# Prepare the compilation environment on mac os
+```
+brew install snappy
+```
+
+# install rocksdb:
+```
+ export ROCKSDB_HOME=/Users/chris/dev/rocksdb
+
+ git clone https://github.com/facebook/rocksdb.git $ROCKSDB_HOME
+ pushd $ROCKSDB_HOME
+ make clean
+ make install-static
+ popd
+```
+
+# install gorocksdb
+
+```
+export CGO_CFLAGS="-I$ROCKSDB_HOME/include"
+export CGO_LDFLAGS="-L$ROCKSDB_HOME -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
+
+go get github.com/tecbot/gorocksdb
+```
+# compile with rocksdb
+
+```
+cd ~/go/src/github.com/chrislusf/seaweedfs/weed
+go install -tags rocksdb
+```
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
new file mode 100644
index 000000000..70c301725
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -0,0 +1,302 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "io"
+
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RocksDBStore{})
+}
+
+type options struct {
+ opt *gorocksdb.Options
+ ro *gorocksdb.ReadOptions
+ wo *gorocksdb.WriteOptions
+}
+
+func (opt *options) init() {
+ opt.opt = gorocksdb.NewDefaultOptions()
+ opt.ro = gorocksdb.NewDefaultReadOptions()
+ opt.wo = gorocksdb.NewDefaultWriteOptions()
+}
+
+func (opt *options) close() {
+ opt.opt.Destroy()
+ opt.ro.Destroy()
+ opt.wo.Destroy()
+}
+
+type RocksDBStore struct {
+ path string
+ db *gorocksdb.DB
+ options
+}
+
+func (store *RocksDBStore) GetName() string {
+ return "rocksdb"
+}
+
+func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir)
+}
+
+func (store *RocksDBStore) initialize(dir string) (err error) {
+ glog.Infof("filer store rocksdb dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+ store.options.init()
+ store.opt.SetCreateIfMissing(true)
+ // reduce write amplification
+ // also avoid expired data stored in highest level never get compacted
+ store.opt.SetLevelCompactionDynamicLevelBytes(true)
+ store.opt.SetCompactionFilter(NewTTLFilter())
+ // store.opt.SetMaxBackgroundCompactions(2)
+
+ store.db, err = gorocksdb.OpenDb(store.opt, dir)
+
+ return
+}
+
+func (store *RocksDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *RocksDBStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *RocksDBStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.DirAndName()
+ key := genKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.db.Put(store.wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+ data, err := store.db.Get(store.ro, key)
+
+ if data == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+ defer data.Free()
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data.Data())
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+
+ err = store.db.Delete(store.wo, key)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ batch := gorocksdb.NewWriteBatch()
+ defer batch.Destroy()
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
+ ro.SetFillCache(false)
+
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
+ batch.Delete(key)
+ return true
+ })
+ if err != nil {
+ return fmt.Errorf("delete list %s : %v", fullpath, err)
+ }
+
+ err = store.db.Write(store.wo, batch)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (err error) {
+
+ if len(lastKey) == 0 {
+ iter.Seek(prefix)
+ } else {
+ iter.Seek(lastKey)
+ if !includeLastKey {
+ if iter.Valid() {
+ if bytes.Equal(iter.Key().Data(), lastKey) {
+ iter.Next()
+ }
+ }
+ }
+ }
+
+ i := int64(0)
+ for ; iter.Valid(); iter.Next() {
+
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+
+ key := iter.Key().Data()
+
+ if !bytes.HasPrefix(key, prefix) {
+ break
+ }
+
+ ret := fn(key, iter.Value().Data())
+
+ if !ret {
+ break
+ }
+
+ }
+
+ if err := iter.Err(); err != nil {
+ return fmt.Errorf("prefix scan iterator: %v", err)
+ }
+ return nil
+}
+
+func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
+ ro.SetFillCache(false)
+
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool {
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ return true
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
+ }
+ lastFileName = fileName
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return false
+ }
+ if !eachEntryFunc(entry) {
+ return false
+ }
+ return true
+ })
+ if err != nil {
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ }
+
+ return lastFileName, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory, and use last byte for partitioning
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ return b
+}
+
+func (store *RocksDBStore) Shutdown() {
+ store.db.Close()
+ store.options.close()
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_kv.go b/weed/filer/rocksdb/rocksdb_store_kv.go
new file mode 100644
index 000000000..cf1214d5b
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_kv.go
@@ -0,0 +1,47 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ err = store.db.Put(store.wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ value, err = store.db.GetBytes(store.ro, key)
+
+ if value == nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("kv get: %v", err)
+ }
+
+ return
+}
+
+func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ err = store.db.Delete(store.wo, key)
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
new file mode 100644
index 000000000..439663524
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -0,0 +1,117 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := testFiler.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func BenchmarkInsertEntry(b *testing.B) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ entry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+}
diff --git a/weed/filer/rocksdb/rocksdb_ttl.go b/weed/filer/rocksdb/rocksdb_ttl.go
new file mode 100644
index 000000000..faed22310
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_ttl.go
@@ -0,0 +1,40 @@
+//+build rocksdb
+
+package rocksdb
+
+import (
+ "time"
+
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+type TTLFilter struct {
+ skipLevel0 bool
+}
+
+func NewTTLFilter() gorocksdb.CompactionFilter {
+ return &TTLFilter{
+ skipLevel0: true,
+ }
+}
+
+func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []byte) {
+ // decode could be slow, causing write stall
+ // level >0 sst can run compaction in parallel
+ if !t.skipLevel0 || level > 0 {
+ entry := filer.Entry{}
+ if err := entry.DecodeAttributesAndChunks(val); err == nil {
+ if entry.TtlSec > 0 &&
+ entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
+ return true, nil
+ }
+ }
+ }
+ return false, val
+}
+
+func (t *TTLFilter) Name() string {
+ return "TTLFilter"
+}