diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-01-21 00:47:27 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-01-21 00:47:27 -0800 |
| commit | a9e6db1a8e964f10571a5e11c197fd2da141c6f7 (patch) | |
| tree | 0a22db3d6c68178d8e913c1bffd550ae4127a7e1 /weed/filer/leveldb3/leveldb3_store.go | |
| parent | f0455dee683e831487c345a575b7894c0e2bf61a (diff) | |
| parent | 84f05787f8eecfcb61e49882346ad5855b6bb784 (diff) | |
| download | seaweedfs-origin/ftp.tar.xz seaweedfs-origin/ftp.zip | |
Merge branch 'master' into ftporigin/ftp
Diffstat (limited to 'weed/filer/leveldb3/leveldb3_store.go')
| -rw-r--r-- | weed/filer/leveldb3/leveldb3_store.go | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go new file mode 100644 index 000000000..24e00edc7 --- /dev/null +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -0,0 +1,375 @@ +package leveldb + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "github.com/syndtr/goleveldb/leveldb" + leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" + leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + "io" + "os" + "strings" + "sync" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + DEFAULT = "_main" +) + +func init() { + filer.Stores = append(filer.Stores, &LevelDB3Store{}) +} + +type LevelDB3Store struct { + dir string + dbs map[string]*leveldb.DB + dbsLock sync.RWMutex +} + +func (store *LevelDB3Store) GetName() string { + return "leveldb3" +} + +func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") + return store.initialize(dir) +} + +func (store *LevelDB3Store) initialize(dir string) (err error) { + glog.Infof("filer store leveldb3 dir: %s", dir) + if err := weed_util.TestFolderWritable(dir); err != nil { + return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) + } + store.dir = dir + + db, loadDbErr := store.loadDB(DEFAULT) + if loadDbErr != nil { + return loadDbErr + } + store.dbs = make(map[string]*leveldb.DB) + store.dbs[DEFAULT] = db + + return +} + +func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) { + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 4, + } + if name != DEFAULT { + opts = &opt.Options{ + BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 4, + } + } + + dbFolder := fmt.Sprintf("%s/%s", store.dir, name) + os.MkdirAll(dbFolder, 0755) + db, dbErr := leveldb.OpenFile(dbFolder, opts) + if leveldb_errors.IsCorrupted(dbErr) { + db, dbErr = leveldb.RecoverFile(dbFolder, opts) + } + if dbErr != nil { + glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr) + return nil, dbErr + } + return db, nil +} + +func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) { + + store.dbsLock.RLock() + + defaultDB := store.dbs[DEFAULT] + if !strings.HasPrefix(string(fullpath), "/buckets/") { + store.dbsLock.RUnlock() + return defaultDB, DEFAULT, fullpath, nil + } + + // detect bucket + bucketAndObjectKey := string(fullpath)[len("/buckets/"):] + t := strings.Index(bucketAndObjectKey, "/") + if t < 0 && !isForChildren { + store.dbsLock.RUnlock() + return defaultDB, DEFAULT, fullpath, nil + } + bucket := bucketAndObjectKey + shortPath := weed_util.FullPath("/") + if t > 0 { + bucket = bucketAndObjectKey[:t] + shortPath = weed_util.FullPath(bucketAndObjectKey[t:]) + } + + if db, found := store.dbs[bucket]; found { + store.dbsLock.RUnlock() + return db, bucket, shortPath, nil + } + + store.dbsLock.RUnlock() + // upgrade to write lock + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + // double check after getting the write lock + if db, found := store.dbs[bucket]; found { + return db, bucket, shortPath, nil + } + + // create db + db, err := store.loadDB(bucket) + if err != nil { + return nil, bucket, shortPath, err + } + store.dbs[bucket] = db + + return db, bucket, shortPath, nil +} + +func (store *LevelDB3Store) closeDB(bucket string) { + + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if db, found := store.dbs[bucket]; found { + db.Close() + delete(store.dbs, bucket) + } + +} + +func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + + db, _, shortPath, err := store.findDB(entry.FullPath, false) + if err != nil { + return fmt.Errorf("findDB %s : %v", entry.FullPath, err) + } + + dir, name := shortPath.DirAndName() + key := genKey(dir, name) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if len(entry.Chunks) > 50 { + value = weed_util.MaybeGzipData(value) + } + + err = db.Put(key, value, nil) + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) + + return nil +} + +func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + + db, _, shortPath, err := store.findDB(fullpath, false) + if err != nil { + return nil, fmt.Errorf("findDB %s : %v", fullpath, err) + } + + dir, name := shortPath.DirAndName() + key := genKey(dir, name) + + data, err := db.Get(key, nil) + + if err == leveldb.ErrNotFound { + return nil, filer_pb.ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) + + return entry, nil +} + +func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + + db, _, shortPath, err := store.findDB(fullpath, false) + if err != nil { + return fmt.Errorf("findDB %s : %v", fullpath, err) + } + + dir, name := shortPath.DirAndName() + key := genKey(dir, name) + + err = db.Delete(key, nil) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + + db, bucket, shortPath, err := store.findDB(fullpath, true) + if err != nil { + return fmt.Errorf("findDB %s : %v", fullpath, err) + } + + if bucket != DEFAULT && shortPath == "/" { + store.closeDB(bucket) + if bucket != "" { // just to make sure + os.RemoveAll(store.dir + "/" + bucket) + } + return nil + } + + directoryPrefix := genDirectoryKeyPrefix(shortPath, "") + + batch := new(leveldb.Batch) + + iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + batch.Delete(append(directoryPrefix, []byte(fileName)...)) + } + iter.Release() + + err = db.Write(batch, nil) + + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + db, _, shortPath, err := store.findDB(dirPath, true) + if err != nil { + return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) + } + + directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix) + lastFileStart := directoryPrefix + if startFileName != "" { + lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName) + } + + iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + if fileName == startFileName && !includeStartFile { + continue + } + lastFileName = fileName + limit-- + if limit < 0 { + break + } + entry := &filer.Entry{ + FullPath: weed_util.NewFullPath(string(dirPath), fileName), + } + + // println("list", entry.FullPath, "chunks", len(entry.Chunks)) + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + if !eachEntryFunc(entry) { + break + } + } + iter.Release() + + return lastFileName, err +} + +func genKey(dirPath, fileName string) (key []byte) { + key = hashToBytes(dirPath) + key = append(key, []byte(fileName)...) + return key +} + +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = hashToBytes(string(fullpath)) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func getNameFromKey(key []byte) string { + + return string(key[md5.Size:]) + +} + +// hash directory +func hashToBytes(dir string) []byte { + h := md5.New() + io.WriteString(h, dir) + b := h.Sum(nil) + return b +} + +func (store *LevelDB3Store) Shutdown() { + for _, db := range store.dbs { + db.Close() + } +} |
