aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go58
-rw-r--r--weed/filer2/cassandra/cassandra_store.go25
-rw-r--r--weed/filer2/entry.go13
-rw-r--r--weed/filer2/entry_codec.go2
-rw-r--r--weed/filer2/filechunks.go16
-rw-r--r--weed/filer2/filer.go107
-rw-r--r--weed/filer2/filer_client_util.go163
-rw-r--r--weed/filer2/filer_deletion.go28
-rw-r--r--weed/filer2/filer_notify.go12
-rw-r--r--weed/filer2/filerstore.go113
-rw-r--r--weed/filer2/fullpath.go13
-rw-r--r--weed/filer2/leveldb/leveldb_store.go32
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go19
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go208
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go88
-rw-r--r--weed/filer2/memdb/memdb_store.go33
-rw-r--r--weed/filer2/memdb/memdb_store_test.go33
-rw-r--r--weed/filer2/postgres/README.txt4
-rw-r--r--weed/filer2/redis/redis_cluster_store.go6
-rw-r--r--weed/filer2/redis/universal_redis_store.go25
-rw-r--r--weed/filer2/stream.go41
21 files changed, 901 insertions, 138 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index 5f2990475..3e8554957 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -1,6 +1,7 @@
package abstract_sql
import (
+ "context"
"database/sql"
"fmt"
@@ -18,7 +19,44 @@ type AbstractSqlStore struct {
SqlListInclusive string
}
-func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) {
+type TxOrDB interface {
+ ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
+ QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+ QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
+}
+
+func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
+ Isolation: sql.LevelReadCommitted,
+ ReadOnly: false,
+ })
+ if err != nil {
+ return ctx, err
+ }
+
+ return context.WithValue(ctx, "tx", tx), nil
+}
+func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx.Commit()
+ }
+ return nil
+}
+func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx
+ }
+ return store.DB
+}
+
+func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
@@ -26,7 +64,7 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.DB.Exec(store.SqlInsert, hashToLong(dir), name, dir, meta)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta)
if err != nil {
return fmt.Errorf("insert %s: %s", entry.FullPath, err)
}
@@ -38,7 +76,7 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
@@ -46,7 +84,7 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.DB.Exec(store.SqlUpdate, meta, hashToLong(dir), name, dir)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("update %s: %s", entry.FullPath, err)
}
@@ -58,10 +96,10 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entry, error) {
+func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
dir, name := fullpath.DirAndName()
- row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir)
+ row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir)
var data []byte
if err := row.Scan(&data); err != nil {
return nil, filer2.ErrNotFound
@@ -77,11 +115,11 @@ func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entr
return entry, nil
}
-func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error {
+func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
dir, name := fullpath.DirAndName()
- res, err := store.DB.Exec(store.SqlDelete, hashToLong(dir), name, dir)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("delete %s: %s", fullpath, err)
}
@@ -94,14 +132,14 @@ func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error {
return nil
}
-func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive
if inclusive {
sqlText = store.SqlListInclusive
}
- rows, err := store.DB.Query(sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
+ rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index 2c1f03182..466be5bf3 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -1,6 +1,7 @@
package cassandra
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -39,7 +40,17 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er
return
}
-func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
@@ -56,12 +67,12 @@ func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(entry)
+ return store.InsertEntry(ctx, entry)
}
-func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
dir, name := fullpath.DirAndName()
var data []byte
@@ -74,7 +85,7 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.
}
if len(data) == 0 {
- return nil, fmt.Errorf("not found: %s", fullpath)
+ return nil, filer2.ErrNotFound
}
entry = &filer2.Entry{
@@ -88,7 +99,7 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.
return entry, nil
}
-func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
+func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
dir, name := fullpath.DirAndName()
@@ -101,7 +112,7 @@ func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
return nil
}
-func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index f17a11727..3f8a19114 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -52,9 +52,20 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
return nil
}
return &filer_pb.Entry{
- Name: string(entry.FullPath),
+ Name: entry.FullPath.Name(),
IsDirectory: entry.IsDirectory(),
Attributes: EntryAttributeToPb(entry),
Chunks: entry.Chunks,
}
}
+
+func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
+ if entry == nil {
+ return nil
+ }
+ dir, _ := entry.FullPath.DirAndName()
+ return &filer_pb.FullEntry{
+ Dir: dir,
+ Entry: entry.ToProtoEntry(),
+ }
+}
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
index e50b3fa9a..cf4627b74 100644
--- a/weed/filer2/entry_codec.go
+++ b/weed/filer2/entry_codec.go
@@ -6,7 +6,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/gogo/protobuf/proto"
+ "github.com/golang/protobuf/proto"
)
func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index 6c3157e6c..b5876df82 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -40,7 +40,7 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
fileIds[interval.fileId] = true
}
for _, chunk := range chunks {
- if found := fileIds[chunk.FileId]; found {
+ if _, found := fileIds[chunk.GetFileIdString()]; found {
compacted = append(compacted, chunk)
} else {
garbage = append(garbage, chunk)
@@ -50,15 +50,15 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
return
}
-func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) {
+func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
fileIds := make(map[string]bool)
- for _, interval := range newChunks {
- fileIds[interval.FileId] = true
+ for _, interval := range bs {
+ fileIds[interval.GetFileIdString()] = true
}
- for _, chunk := range oldChunks {
- if found := fileIds[chunk.FileId]; !found {
- unused = append(unused, chunk)
+ for _, chunk := range as {
+ if _, found := fileIds[chunk.GetFileIdString()]; !found {
+ delta = append(delta, chunk)
}
}
@@ -123,7 +123,7 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
newV := newVisibleInterval(
chunk.Offset,
chunk.Offset+int64(chunk.Size),
- chunk.FileId,
+ chunk.GetFileIdString(),
chunk.Mtime,
true,
)
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 1ee2f5ede..cf236b74d 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -3,6 +3,7 @@ package filer2
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"math"
"os"
"path/filepath"
@@ -20,17 +21,19 @@ var (
)
type Filer struct {
- store FilerStore
+ store *FilerStoreWrapper
directoryCache *ccache.Cache
MasterClient *wdclient.MasterClient
fileIdDeletionChan chan string
+ GrpcDialOption grpc.DialOption
}
-func NewFiler(masters []string) *Filer {
+func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
+ MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
fileIdDeletionChan: make(chan string, 4096),
+ GrpcDialOption: grpcDialOption,
}
go f.loopProcessingDeletion()
@@ -39,7 +42,7 @@ func NewFiler(masters []string) *Filer {
}
func (f *Filer) SetStore(store FilerStore) {
- f.store = store
+ f.store = NewFilerStoreWrapper(store)
}
func (f *Filer) DisableDirectoryCache() {
@@ -54,7 +57,19 @@ func (fs *Filer) KeepConnectedToMaster() {
fs.MasterClient.KeepConnectedToMaster()
}
-func (f *Filer) CreateEntry(entry *Entry) error {
+func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return f.store.BeginTransaction(ctx)
+}
+
+func (f *Filer) CommitTransaction(ctx context.Context) error {
+ return f.store.CommitTransaction(ctx)
+}
+
+func (f *Filer) RollbackTransaction(ctx context.Context) error {
+ return f.store.RollbackTransaction(ctx)
+}
+
+func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
if string(entry.FullPath) == "/" {
return nil
@@ -67,7 +82,7 @@ func (f *Filer) CreateEntry(entry *Entry) error {
var lastDirectoryEntry *Entry
for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + filepath.Join(dirParts[:i]...)
+ dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...))
// fmt.Printf("%d directory: %+v\n", i, dirPath)
// first check local cache
@@ -76,7 +91,7 @@ func (f *Filer) CreateEntry(entry *Entry) error {
// not found, check the store directly
if dirEntry == nil {
glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ = f.FindEntry(FullPath(dirPath))
+ dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
} else {
glog.V(4).Infof("found cached directory: %s", dirPath)
}
@@ -99,9 +114,9 @@ func (f *Filer) CreateEntry(entry *Entry) error {
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.store.InsertEntry(dirEntry)
+ mkdirErr := f.store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
- if _, err := f.FindEntry(FullPath(dirPath)); err == ErrNotFound {
+ if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound {
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
@@ -134,14 +149,16 @@ func (f *Filer) CreateEntry(entry *Entry) error {
}
*/
- oldEntry, _ := f.FindEntry(entry.FullPath)
+ oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
if oldEntry == nil {
- if err := f.store.InsertEntry(entry); err != nil {
+ if err := f.store.InsertEntry(ctx, entry); err != nil {
+ glog.Errorf("insert entry %s: %v", entry.FullPath, err)
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
} else {
- if err := f.UpdateEntry(oldEntry, entry); err != nil {
+ if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
+ glog.Errorf("update entry %s: %v", entry.FullPath, err)
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
}
}
@@ -153,19 +170,21 @@ func (f *Filer) CreateEntry(entry *Entry) error {
return nil
}
-func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) {
+func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
if oldEntry.IsDirectory() && !entry.IsDirectory() {
+ glog.Errorf("existing %s is a directory", entry.FullPath)
return fmt.Errorf("existing %s is a directory", entry.FullPath)
}
if !oldEntry.IsDirectory() && entry.IsDirectory() {
+ glog.Errorf("existing %s is a file", entry.FullPath)
return fmt.Errorf("existing %s is a file", entry.FullPath)
}
}
- return f.store.UpdateEntry(entry)
+ return f.store.UpdateEntry(ctx, entry)
}
-func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) {
+func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
now := time.Now()
@@ -181,11 +200,11 @@ func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) {
},
}, nil
}
- return f.store.FindEntry(p)
+ return f.store.FindEntry(ctx, p)
}
-func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) {
- entry, err := f.FindEntry(p)
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) {
+ entry, err := f.FindEntry(ctx, p)
if err != nil {
return err
}
@@ -198,37 +217,41 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet
lastFileName := ""
includeLastFile := false
for limit > 0 {
- entries, err := f.ListDirectoryEntries(p, lastFileName, includeLastFile, 1024)
+ entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024)
if err != nil {
+ glog.Errorf("list folder %s: %v", p, err)
return fmt.Errorf("list folder %s: %v", p, err)
}
+
if len(entries) == 0 {
break
- } else {
- if isRecursive {
- for _, sub := range entries {
- lastFileName = sub.Name()
- f.DeleteEntryMetaAndData(sub.FullPath, isRecursive, shouldDeleteChunks)
- limit--
- if limit <= 0 {
- break
- }
+ }
+
+ if isRecursive {
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, shouldDeleteChunks)
+ if err != nil {
+ return err
}
- } else {
- if len(entries) > 0 {
- return fmt.Errorf("folder %s is not empty", p)
+ limit--
+ if limit <= 0 {
+ break
}
}
- f.cacheDelDirectory(string(p))
- if len(entries) < 1024 {
- break
- }
+ }
+
+ if len(entries) < 1024 {
+ break
}
}
+
+ f.cacheDelDirectory(string(p))
+
}
if shouldDeleteChunks {
- f.DeleteChunks(entry.Chunks)
+ f.DeleteChunks(p, entry.Chunks)
}
if p == "/" {
@@ -238,17 +261,22 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet
f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
- return f.store.DeleteEntry(p)
+ return f.store.DeleteEntry(ctx, p)
}
-func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit)
+ return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
}
func (f *Filer) cacheDelDirectory(dirpath string) {
+
+ if dirpath == "/" {
+ return
+ }
+
if f.directoryCache == nil {
return
}
@@ -257,6 +285,7 @@ func (f *Filer) cacheDelDirectory(dirpath string) {
}
func (f *Filer) cacheGetDirectory(dirpath string) *Entry {
+
if f.directoryCache == nil {
return nil
}
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
new file mode 100644
index 000000000..7e093eea2
--- /dev/null
+++ b/weed/filer2/filer_client_util.go
@@ -0,0 +1,163 @@
+package filer2
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func VolumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}
+
+type FilerClient interface {
+ WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error
+}
+
+func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
+ var vids []string
+ for _, chunkView := range chunkViews {
+ vids = append(vids, VolumeId(chunkView.FileId))
+ }
+
+ vid2Locations := make(map[string]*filer_pb.Locations)
+
+ err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: vids,
+ })
+ if err != nil {
+ return err
+ }
+
+ vid2Locations = resp.LocationsMap
+
+ return nil
+ })
+
+ if err != nil {
+ return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
+ }
+
+ var wg sync.WaitGroup
+ for _, chunkView := range chunkViews {
+ wg.Add(1)
+ go func(chunkView *ChunkView) {
+ defer wg.Done()
+
+ glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
+
+ locations := vid2Locations[VolumeId(chunkView.FileId)]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", chunkView.FileId)
+ err = fmt.Errorf("failed to locate %s", chunkView.FileId)
+ return
+ }
+
+ var n int64
+ n, err = util.ReadUrl(
+ fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
+ chunkView.Offset,
+ int(chunkView.Size),
+ buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
+ !chunkView.IsFullChunk)
+
+ if err != nil {
+
+ glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err)
+
+ err = fmt.Errorf("failed to read http://%s/%s: %v",
+ locations.Locations[0].Url, chunkView.FileId, err)
+ return
+ }
+
+ glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
+ totalRead += n
+
+ }(chunkView)
+ }
+ wg.Wait()
+ return
+}
+
+func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) {
+
+ dir, name := FullPath(fullFilePath).DirAndName()
+
+ err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: dir,
+ Name: name,
+ }
+
+ glog.V(3).Infof("read %s request: %v", fullFilePath, request)
+ resp, err := client.LookupDirectoryEntry(ctx, request)
+ if err != nil {
+ if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
+ return nil
+ }
+ glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err)
+ return err
+ }
+
+ if resp.Entry != nil {
+ entry = resp.Entry
+ }
+
+ return nil
+ })
+
+ return
+}
+
+func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) {
+
+ err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ paginationLimit := 1024
+
+ lastEntryName := ""
+
+ for {
+
+ request := &filer_pb.ListEntriesRequest{
+ Directory: fullDirPath,
+ StartFromFileName: lastEntryName,
+ Limit: uint32(paginationLimit),
+ }
+
+ glog.V(3).Infof("read directory: %v", request)
+ resp, err := client.ListEntries(ctx, request)
+ if err != nil {
+ return fmt.Errorf("list %s: %v", fullDirPath, err)
+ }
+
+ for _, entry := range resp.Entries {
+ fn(entry)
+ lastEntryName = entry.Name
+ }
+
+ if len(resp.Entries) < paginationLimit {
+ break
+ }
+
+ }
+
+ return nil
+
+ })
+
+ return
+}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 8fe8ae04f..fea93d57f 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -38,25 +38,28 @@ func (f *Filer) loopProcessingDeletion() {
fileIds = append(fileIds, fid)
if len(fileIds) >= 4096 {
glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
fileIds = fileIds[:0]
}
case <-ticker.C:
if len(fileIds) > 0 {
glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
fileIds = fileIds[:0]
}
}
}
}
-func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
+func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- f.fileIdDeletionChan <- chunk.FileId
+ glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String())
+ f.fileIdDeletionChan <- chunk.GetFileIdString()
}
}
+// DeleteFileByFileId direct delete by file id.
+// Only used when the fileId is not being managed by snapshots.
func (f *Filer) DeleteFileByFileId(fileId string) {
f.fileIdDeletionChan <- fileId
}
@@ -67,22 +70,19 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
return
}
if newEntry == nil {
- f.DeleteChunks(oldEntry.Chunks)
+ f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks)
}
var toDelete []*filer_pb.FileChunk
+ newChunkIds := make(map[string]bool)
+ for _, newChunk := range newEntry.Chunks {
+ newChunkIds[newChunk.GetFileIdString()] = true
+ }
for _, oldChunk := range oldEntry.Chunks {
- found := false
- for _, newChunk := range newEntry.Chunks {
- if oldChunk.FileId == newChunk.FileId {
- found = true
- break
- }
- }
- if !found {
+ if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(toDelete)
+ f.DeleteChunks(oldEntry.FullPath, toDelete)
}
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index b3c215249..c37381116 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -20,12 +20,18 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
glog.V(3).Infof("notifying entry update %v", key)
+ newParentPath := ""
+ if newEntry != nil {
+ newParentPath, _ = newEntry.FullPath.DirAndName()
+ }
+
notification.Queue.SendMessage(
key,
&filer_pb.EventNotification{
- OldEntry: oldEntry.ToProtoEntry(),
- NewEntry: newEntry.ToProtoEntry(),
- DeleteChunks: deleteChunks,
+ OldEntry: oldEntry.ToProtoEntry(),
+ NewEntry: newEntry.ToProtoEntry(),
+ DeleteChunks: deleteChunks,
+ NewParentPath: newParentPath,
},
)
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index 9ef1d9d48..231c7fc68 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -1,7 +1,12 @@
package filer2
import (
+ "context"
"errors"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -10,12 +15,110 @@ type FilerStore interface {
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration) error
- InsertEntry(*Entry) error
- UpdateEntry(*Entry) (err error)
+ InsertEntry(context.Context, *Entry) error
+ UpdateEntry(context.Context, *Entry) (err error)
// err == filer2.ErrNotFound if not found
- FindEntry(FullPath) (entry *Entry, err error)
- DeleteEntry(FullPath) (err error)
- ListDirectoryEntries(dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+ FindEntry(context.Context, FullPath) (entry *Entry, err error)
+ DeleteEntry(context.Context, FullPath) (err error)
+ ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+
+ BeginTransaction(ctx context.Context) (context.Context, error)
+ CommitTransaction(ctx context.Context) error
+ RollbackTransaction(ctx context.Context) error
}
var ErrNotFound = errors.New("filer: no entry is found in filer store")
+
+type FilerStoreWrapper struct {
+ actualStore FilerStore
+}
+
+func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
+ return &FilerStoreWrapper{
+ actualStore: store,
+ }
+}
+
+func (fsw *FilerStoreWrapper) GetName() string {
+ return fsw.actualStore.GetName()
+}
+
+func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error {
+ return fsw.actualStore.Initialize(configuration)
+}
+
+func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ return fsw.actualStore.InsertEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ return fsw.actualStore.UpdateEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
+ }()
+
+ entry, err = fsw.actualStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return
+}
+
+func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ return fsw.actualStore.DeleteEntry(ctx, fp)
+}
+
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
+ }()
+
+ entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, err
+}
+
+func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return fsw.actualStore.BeginTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
+ return fsw.actualStore.CommitTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
+ return fsw.actualStore.RollbackTransaction(ctx)
+}
diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go
index be6e34431..191e51cf3 100644
--- a/weed/filer2/fullpath.go
+++ b/weed/filer2/fullpath.go
@@ -8,10 +8,7 @@ import (
type FullPath string
func NewFullPath(dir, name string) FullPath {
- if strings.HasSuffix(dir, "/") {
- return FullPath(dir + name)
- }
- return FullPath(dir + "/" + name)
+ return FullPath(dir).Child(name)
}
func (fp FullPath) DirAndName() (string, string) {
@@ -29,3 +26,11 @@ func (fp FullPath) Name() string {
_, name := filepath.Split(string(fp))
return name
}
+
+func (fp FullPath) Child(name string) FullPath {
+ dir := string(fp)
+ if strings.HasSuffix(dir, "/") {
+ return FullPath(dir + name)
+ }
+ return FullPath(dir + "/" + name)
+}
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index 179107e2c..d00eba859 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -2,12 +2,14 @@ package leveldb
import (
"bytes"
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
)
@@ -38,14 +40,30 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
- if store.db, err = leveldb.OpenFile(dir, nil); err != nil {
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10,
+ }
+
+ if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
glog.Infof("filer store open dir %s: %v", dir, err)
return
}
return
}
-func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *LevelDBStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
key := genKey(entry.DirAndName())
value, err := entry.EncodeAttributesAndChunks()
@@ -64,12 +82,12 @@ func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *LevelDBStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(entry)
+ return store.InsertEntry(ctx, entry)
}
-func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
key := genKey(fullpath.DirAndName())
data, err := store.db.Get(key, nil)
@@ -94,7 +112,7 @@ func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.En
return entry, nil
}
-func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
+func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
key := genKey(fullpath.DirAndName())
err = store.db.Delete(key, nil)
@@ -105,7 +123,7 @@ func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
return nil
}
-func (store *LevelDBStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 5b214558f..904de8c97 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -1,6 +1,7 @@
package leveldb
import (
+ "context"
"github.com/chrislusf/seaweedfs/weed/filer2"
"io/ioutil"
"os"
@@ -8,7 +9,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -18,6 +19,8 @@ func TestCreateAndFind(t *testing.T) {
fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
+ ctx := context.Background()
+
entry1 := &filer2.Entry{
FullPath: fullpath,
Attr: filer2.Attr{
@@ -27,12 +30,12 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := filer.CreateEntry(entry1); err != nil {
+ if err := filer.CreateEntry(ctx, entry1); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
- entry, err := filer.FindEntry(fullpath)
+ entry, err := filer.FindEntry(ctx, fullpath)
if err != nil {
t.Errorf("find entry: %v", err)
@@ -45,14 +48,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -61,7 +64,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -69,8 +72,10 @@ func TestEmptyRoot(t *testing.T) {
filer.SetStore(store)
filer.DisableDirectoryCache()
+ ctx := context.Background()
+
// checking one upper directory
- entries, err := filer.ListDirectoryEntries(filer2.FullPath("/"), "", false, 100)
+ entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
new file mode 100644
index 000000000..bce81e357
--- /dev/null
+++ b/weed/filer2/leveldb2/leveldb2_store.go
@@ -0,0 +1,208 @@
+package leveldb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &LevelDB2Store{})
+}
+
+type LevelDB2Store struct {
+ dbs []*leveldb.DB
+ dbCount int
+}
+
+func (store *LevelDB2Store) GetName() string {
+ return "leveldb2"
+}
+
+func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) {
+ dir := configuration.GetString("dir")
+ return store.initialize(dir, 8)
+}
+
+func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
+ glog.Infof("filer store leveldb2 dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+
+ for d := 0 ; d < dbCount; d++ {
+ dbFolder := fmt.Sprintf("%s/%02d", dir, d)
+ os.MkdirAll(dbFolder, 0755)
+ db, dbErr := leveldb.OpenFile(dbFolder, opts)
+ if dbErr != nil {
+ glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
+ return
+ }
+ store.dbs = append(store.dbs, db)
+ }
+ store.dbCount = dbCount
+
+ return
+}
+
+func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ dir, name := entry.DirAndName()
+ key, partitionId := genKey(dir, name, store.dbCount)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.dbs[partitionId].Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ key, partitionId := genKey(dir, name, store.dbCount)
+
+ data, err := store.dbs[partitionId].Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer2.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ key, partitionId := genKey(dir, name, store.dbCount)
+
+ err = store.dbs[partitionId].Delete(key, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer2.Entry, err error) {
+
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
+ lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
+
+ iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer2.Entry{
+ FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ }
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+
+ if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+ iter.Release()
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) {
+ key, partitionId = hashToBytes(dirPath, dbCount)
+ key = append(key, []byte(fileName)...)
+ return key, partitionId
+}
+
+func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
+ keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount)
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix, partitionId
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory, and use last byte for partitioning
+func hashToBytes(dir string, dbCount int) ([]byte, int) {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ x := b[len(b)-1]
+
+ return b, int(x)%dbCount
+}
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
new file mode 100644
index 000000000..a16803ca1
--- /dev/null
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -0,0 +1,88 @@
+package leveldb
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "io/ioutil"
+ "os"
+ "testing"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ filer := filer2.NewFiler(nil, nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &LevelDB2Store{}
+ store.initialize(dir,2)
+ filer.SetStore(store)
+ filer.DisableDirectoryCache()
+
+ fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer2.Entry{
+ FullPath: fullpath,
+ Attr: filer2.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := filer.CreateEntry(ctx, entry1); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := filer.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ filer := filer2.NewFiler(nil, nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &LevelDB2Store{}
+ store.initialize(dir,2)
+ filer.SetStore(store)
+ filer.DisableDirectoryCache()
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go
index 062f1cd1c..9c10a5472 100644
--- a/weed/filer2/memdb/memdb_store.go
+++ b/weed/filer2/memdb/memdb_store.go
@@ -1,11 +1,13 @@
package memdb
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/google/btree"
"strings"
+ "sync"
)
func init() {
@@ -13,7 +15,8 @@ func init() {
}
type MemDbStore struct {
- tree *btree.BTree
+ tree *btree.BTree
+ treeLock sync.Mutex
}
type entryItem struct {
@@ -33,21 +36,35 @@ func (store *MemDbStore) Initialize(configuration util.Configuration) (err error
return nil
}
-func (store *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *MemDbStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *MemDbStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
// println("inserting", entry.FullPath)
+ store.treeLock.Lock()
store.tree.ReplaceOrInsert(entryItem{entry})
+ store.treeLock.Unlock()
return nil
}
-func (store *MemDbStore) UpdateEntry(entry *filer2.Entry) (err error) {
- if _, err = store.FindEntry(entry.FullPath); err != nil {
+func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ if _, err = store.FindEntry(ctx, entry.FullPath); err != nil {
return fmt.Errorf("no such file %s : %v", entry.FullPath, err)
}
+ store.treeLock.Lock()
store.tree.ReplaceOrInsert(entryItem{entry})
+ store.treeLock.Unlock()
return nil
}
-func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}})
if item == nil {
return nil, filer2.ErrNotFound
@@ -56,12 +73,14 @@ func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entr
return entry, nil
}
-func (store *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
+func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ store.treeLock.Lock()
store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
+ store.treeLock.Unlock()
return nil
}
-func (store *MemDbStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
+func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
startFrom := string(fullpath)
if startFileName != "" {
diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go
index cf813e04b..d823c5177 100644
--- a/weed/filer2/memdb/memdb_store_test.go
+++ b/weed/filer2/memdb/memdb_store_test.go
@@ -1,17 +1,20 @@
package memdb
import (
+ "context"
"github.com/chrislusf/seaweedfs/weed/filer2"
"testing"
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
store := &MemDbStore{}
store.Initialize(nil)
filer.SetStore(store)
filer.DisableDirectoryCache()
+ ctx := context.Background()
+
fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
entry1 := &filer2.Entry{
@@ -23,12 +26,12 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := filer.CreateEntry(entry1); err != nil {
+ if err := filer.CreateEntry(ctx, entry1); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
- entry, err := filer.FindEntry(fullpath)
+ entry, err := filer.FindEntry(ctx, fullpath)
if err != nil {
t.Errorf("find entry: %v", err)
@@ -43,12 +46,14 @@ func TestCreateAndFind(t *testing.T) {
}
func TestCreateFileAndList(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
store := &MemDbStore{}
store.Initialize(nil)
filer.SetStore(store)
filer.DisableDirectoryCache()
+ ctx := context.Background()
+
entry1 := &filer2.Entry{
FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"),
Attr: filer2.Attr{
@@ -67,11 +72,11 @@ func TestCreateFileAndList(t *testing.T) {
},
}
- filer.CreateEntry(entry1)
- filer.CreateEntry(entry2)
+ filer.CreateEntry(ctx, entry1)
+ filer.CreateEntry(ctx, entry2)
// checking the 2 files
- entries, err := filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is/one/"), "", false, 100)
+ entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
@@ -94,21 +99,21 @@ func TestCreateFileAndList(t *testing.T) {
}
// checking the offset
- entries, err = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100)
+ entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking root directory
- entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -124,18 +129,18 @@ func TestCreateFileAndList(t *testing.T) {
Gid: 5678,
},
}
- filer.CreateEntry(entry3)
+ filer.CreateEntry(ctx, entry3)
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
if len(entries) != 2 {
t.Errorf("list entries count: %v", len(entries))
return
}
// delete file and count
- filer.DeleteEntryMetaAndData(file3Path, false, false)
- entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is"), "", false, 100)
+ filer.DeleteEntryMetaAndData(ctx, file3Path, false, false)
+ entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
diff --git a/weed/filer2/postgres/README.txt b/weed/filer2/postgres/README.txt
index ef2ef683b..cb0c99c63 100644
--- a/weed/filer2/postgres/README.txt
+++ b/weed/filer2/postgres/README.txt
@@ -9,8 +9,8 @@ $PGHOME/bin/psql --username=postgres --password seaweedfs
CREATE TABLE IF NOT EXISTS filemeta (
dirhash BIGINT,
- name VARCHAR(1000),
- directory VARCHAR(4096),
+ name VARCHAR(65535),
+ directory VARCHAR(65535),
meta bytea,
PRIMARY KEY (dirhash, name)
);
diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go
index 4f74a8a22..11c315391 100644
--- a/weed/filer2/redis/redis_cluster_store.go
+++ b/weed/filer2/redis/redis_cluster_store.go
@@ -21,12 +21,14 @@ func (store *RedisClusterStore) GetName() string {
func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) {
return store.initialize(
configuration.GetStringSlice("addresses"),
+ configuration.GetString("password"),
)
}
-func (store *RedisClusterStore) initialize(addresses []string) (err error) {
+func (store *RedisClusterStore) initialize(addresses []string, password string) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: addresses,
+ Addrs: addresses,
+ Password: password,
})
return
}
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index 7fd7e1180..ce41d4d70 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -1,6 +1,7 @@
package redis
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -18,7 +19,17 @@ type UniversalRedisStore struct {
Client redis.UniversalClient
}
-func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
@@ -42,12 +53,12 @@ func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *UniversalRedisStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(entry)
+ return store.InsertEntry(ctx, entry)
}
-func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result()
if err == redis.Nil {
@@ -69,7 +80,7 @@ func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *fi
return entry, nil
}
-func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
+func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result()
@@ -88,7 +99,7 @@ func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err err
return nil
}
-func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
@@ -126,7 +137,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath,
// fetch entry meta
for _, fileName := range members {
path := filer2.NewFullPath(string(fullpath), fileName)
- entry, err := store.FindEntry(path)
+ entry, err := store.FindEntry(ctx, path)
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
} else {
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
new file mode 100644
index 000000000..01b87cad1
--- /dev/null
+++ b/weed/filer2/stream.go
@@ -0,0 +1,41 @@
+package filer2
+
+import (
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error {
+
+ chunkViews := ViewFromChunks(chunks, offset, size)
+
+ fileId2Url := make(map[string]string)
+
+ for _, chunkView := range chunkViews {
+
+ urlString, err := masterClient.LookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ fileId2Url[chunkView.FileId] = urlString
+ }
+
+ for _, chunkView := range chunkViews {
+ urlString := fileId2Url[chunkView.FileId]
+ _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ w.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ }
+
+ return nil
+
+}