aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-01-04 10:41:57 -0800
committerGitHub <noreply@github.com>2021-01-04 10:41:57 -0800
commit9fa2d20357fb319d88ed99af50bfc927ecf0656b (patch)
tree8206e863835c978de347814afae35ba94174fd7b
parent2ce86f308ea4836cf534e50dc1388932253b5cd5 (diff)
parentebb223c190385a92c72b4d8096e71558c9f03354 (diff)
downloadseaweedfs-9fa2d20357fb319d88ed99af50bfc927ecf0656b.tar.xz
seaweedfs-9fa2d20357fb319d88ed99af50bfc927ecf0656b.zip
Merge pull request #1727 from qieqieplus/rocksdb
fix #1726
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go81
-rw-r--r--weed/filer/rocksdb/rocksdb_store_kv.go10
2 files changed, 48 insertions, 43 deletions
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index a8992cf03..ca6391386 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -11,7 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/tecbot/gorocksdb"
+ rocksdb "github.com/tecbot/gorocksdb"
"io"
)
@@ -19,9 +19,28 @@ func init() {
filer.Stores = append(filer.Stores, &RocksDBStore{})
}
+type options struct {
+ opt *rocksdb.Options
+ ro *rocksdb.ReadOptions
+ wo *rocksdb.WriteOptions
+}
+
+func (opt *options) init() {
+ opt.opt = rocksdb.NewDefaultOptions()
+ opt.ro = rocksdb.NewDefaultReadOptions()
+ opt.wo = rocksdb.NewDefaultWriteOptions()
+}
+
+func (opt *options) close() {
+ opt.opt.Destroy()
+ opt.ro.Destroy()
+ opt.wo.Destroy()
+}
+
type RocksDBStore struct {
path string
- db *gorocksdb.DB
+ db *rocksdb.DB
+ options
}
func (store *RocksDBStore) GetName() string {
@@ -38,10 +57,9 @@ func (store *RocksDBStore) initialize(dir string) (err error) {
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
-
- options := gorocksdb.NewDefaultOptions()
- options.SetCreateIfMissing(true)
- store.db, err = gorocksdb.OpenDb(options, dir)
+ store.options.init()
+ store.opt.SetCreateIfMissing(true)
+ store.db, err = rocksdb.OpenDb(store.opt, dir)
return
}
@@ -65,8 +83,7 @@ func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- wo := gorocksdb.NewDefaultWriteOptions()
- err = store.db.Put(wo, key, value)
+ err = store.db.Put(store.wo, key, value)
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
@@ -85,21 +102,21 @@ func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
key := genKey(dir, name)
-
- ro := gorocksdb.NewDefaultReadOptions()
- data, err := store.db.GetBytes(ro, key)
+ data, err := store.db.Get(store.ro, key)
if data == nil {
return nil, filer_pb.ErrNotFound
}
+ defer data.Free()
+
if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data.Data()))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -113,8 +130,7 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
dir, name := fullpath.DirAndName()
key := genKey(dir, name)
- wo := gorocksdb.NewDefaultWriteOptions()
- err = store.db.Delete(wo, key)
+ err = store.db.Delete(store.wo, key)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@@ -125,10 +141,13 @@ func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
- batch := new(gorocksdb.WriteBatch)
+ batch := rocksdb.NewWriteBatch()
+ defer batch.Destroy()
- ro := gorocksdb.NewDefaultReadOptions()
+ ro := rocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
ro.SetFillCache(false)
+
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
@@ -139,8 +158,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return fmt.Errorf("delete list %s : %v", fullpath, err)
}
- wo := gorocksdb.NewDefaultWriteOptions()
- err = store.db.Write(wo, batch)
+ err = store.db.Write(store.wo, batch)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
@@ -149,7 +167,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil
}
-func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
+func enumerate(iter *rocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
if len(lastKey) == 0 {
iter.Seek(prefix)
@@ -157,11 +175,7 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
iter.Seek(lastKey)
if !includeLastKey {
- k := iter.Key()
- v := iter.Value()
- key := k.Data()
- defer k.Free()
- defer v.Free()
+ key := iter.Key().Data()
if !bytes.HasPrefix(key, prefix) {
return nil
@@ -184,21 +198,13 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
}
}
- k := iter.Key()
- v := iter.Value()
- key := k.Data()
- value := v.Data()
+ key := iter.Key().Data()
if !bytes.HasPrefix(key, prefix) {
- k.Free()
- v.Free()
break
}
- ret := fn(key, value)
-
- k.Free()
- v.Free()
+ ret := fn(key, iter.Value().Data())
if !ret {
break
@@ -225,8 +231,10 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
}
- ro := gorocksdb.NewDefaultReadOptions()
+ ro := rocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
ro.SetFillCache(false)
+
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool {
@@ -290,4 +298,5 @@ func hashToBytes(dir string) []byte {
func (store *RocksDBStore) Shutdown() {
store.db.Close()
+ store.options.close()
}
diff --git a/weed/filer/rocksdb/rocksdb_store_kv.go b/weed/filer/rocksdb/rocksdb_store_kv.go
index 093a905e8..cf1214d5b 100644
--- a/weed/filer/rocksdb/rocksdb_store_kv.go
+++ b/weed/filer/rocksdb/rocksdb_store_kv.go
@@ -5,15 +5,13 @@ package rocksdb
import (
"context"
"fmt"
- "github.com/tecbot/gorocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
)
func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- wo := gorocksdb.NewDefaultWriteOptions()
- err = store.db.Put(wo, key, value)
+ err = store.db.Put(store.wo, key, value)
if err != nil {
return fmt.Errorf("kv put: %v", err)
@@ -24,8 +22,7 @@ func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte)
func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- ro := gorocksdb.NewDefaultReadOptions()
- value, err = store.db.GetBytes(ro, key)
+ value, err = store.db.GetBytes(store.ro, key)
if value == nil {
return nil, filer.ErrKvNotFound
@@ -40,8 +37,7 @@ func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte,
func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) {
- wo := gorocksdb.NewDefaultWriteOptions()
- err = store.db.Delete(wo, key)
+ err = store.db.Delete(store.wo, key)
if err != nil {
return fmt.Errorf("kv delete: %v", err)