aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/compact_leveldb/compact_leveldb.go35
-rw-r--r--weed/command/scaffold.go6
-rw-r--r--weed/filer2/leveldb/leveldb_store.go9
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go190
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go88
5 files changed, 327 insertions, 1 deletions
diff --git a/unmaintained/compact_leveldb/compact_leveldb.go b/unmaintained/compact_leveldb/compact_leveldb.go
new file mode 100644
index 000000000..317356c3f
--- /dev/null
+++ b/unmaintained/compact_leveldb/compact_leveldb.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ "flag"
+ "log"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+var (
+ dir = flag.String("dir", ".", "data directory to store leveldb files")
+)
+
+func main() {
+
+ flag.Parse()
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10,
+ OpenFilesCacheCapacity: -1,
+ }
+
+ db, err := leveldb.OpenFile(*dir, opts)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer db.Close()
+ if err := db.CompactRange(util.Range{}); err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index e56a1b5ba..a0908912c 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -63,6 +63,12 @@ enabled = false
[leveldb]
# local on disk, mostly for simple single-machine setup, fairly scalable
+enabled = false
+dir = "." # directory to store level db files
+
+[leveldb2]
+# local on disk, mostly for simple single-machine setup, fairly scalable
+# faster than previous leveldb, recommended.
enabled = true
dir = "." # directory to store level db files
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index 5b3a63959..a55c58153 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
)
@@ -39,7 +40,13 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
- if store.db, err = leveldb.OpenFile(dir, nil); err != nil {
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+
+ if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
glog.Infof("filer store open dir %s: %v", dir, err)
return
}
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
new file mode 100644
index 000000000..c0e2cd877
--- /dev/null
+++ b/weed/filer2/leveldb2/leveldb2_store.go
@@ -0,0 +1,190 @@
+package leveldb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &LevelDB2Store{})
+}
+
+// known theoretically 128 bit MD5 collision of 2 directories.
+// (but really? please show some real examples)
+type LevelDB2Store struct {
+ db *leveldb.DB
+}
+
+func (store *LevelDB2Store) GetName() string {
+ return "leveldb2"
+}
+
+func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) {
+ dir := configuration.GetString("dir")
+ return store.initialize(dir)
+}
+
+func (store *LevelDB2Store) initialize(dir string) (err error) {
+ glog.Infof("filer store leveldb2 dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10,
+ }
+
+ if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
+ glog.Infof("filer store open dir %s: %v", dir, err)
+ return
+ }
+ return
+}
+
+func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ key := genKey(entry.DirAndName())
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.db.Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+ key := genKey(fullpath.DirAndName())
+
+ data, err := store.db.Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer2.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ key := genKey(fullpath.DirAndName())
+
+ err = store.db.Delete(key, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer2.Entry, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer2.Entry{
+ FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+ iter.Release()
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[8:])
+
+}
+
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ return b
+}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
new file mode 100644
index 000000000..5d6e660de
--- /dev/null
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -0,0 +1,88 @@
+package leveldb
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "io/ioutil"
+ "os"
+ "testing"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ filer := filer2.NewFiler(nil, nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &LevelDB2Store{}
+ store.initialize(dir)
+ filer.SetStore(store)
+ filer.DisableDirectoryCache()
+
+ fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer2.Entry{
+ FullPath: fullpath,
+ Attr: filer2.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := filer.CreateEntry(ctx, entry1); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := filer.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ filer := filer2.NewFiler(nil, nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &LevelDB2Store{}
+ store.initialize(dir)
+ filer.SetStore(store)
+ filer.DisableDirectoryCache()
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}