diff options
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/abstract_sql/abstract_sql_store.go | 58 | ||||
| -rw-r--r-- | weed/filer2/cassandra/cassandra_store.go | 25 | ||||
| -rw-r--r-- | weed/filer2/entry.go | 13 | ||||
| -rw-r--r-- | weed/filer2/entry_codec.go | 2 | ||||
| -rw-r--r-- | weed/filer2/filechunks.go | 16 | ||||
| -rw-r--r-- | weed/filer2/filer.go | 107 | ||||
| -rw-r--r-- | weed/filer2/filer_client_util.go | 163 | ||||
| -rw-r--r-- | weed/filer2/filer_deletion.go | 28 | ||||
| -rw-r--r-- | weed/filer2/filer_notify.go | 12 | ||||
| -rw-r--r-- | weed/filer2/filerstore.go | 113 | ||||
| -rw-r--r-- | weed/filer2/fullpath.go | 13 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store.go | 32 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store_test.go | 19 | ||||
| -rw-r--r-- | weed/filer2/leveldb2/leveldb2_store.go | 208 | ||||
| -rw-r--r-- | weed/filer2/leveldb2/leveldb2_store_test.go | 88 | ||||
| -rw-r--r-- | weed/filer2/memdb/memdb_store.go | 33 | ||||
| -rw-r--r-- | weed/filer2/memdb/memdb_store_test.go | 33 | ||||
| -rw-r--r-- | weed/filer2/postgres/README.txt | 4 | ||||
| -rw-r--r-- | weed/filer2/redis/redis_cluster_store.go | 6 | ||||
| -rw-r--r-- | weed/filer2/redis/universal_redis_store.go | 25 | ||||
| -rw-r--r-- | weed/filer2/stream.go | 41 |
21 files changed, 901 insertions, 138 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 5f2990475..3e8554957 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -1,6 +1,7 @@ package abstract_sql import ( + "context" "database/sql" "fmt" @@ -18,7 +19,44 @@ type AbstractSqlStore struct { SqlListInclusive string } -func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) { +type TxOrDB interface { + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) +} + +func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) { + tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + ReadOnly: false, + }) + if err != nil { + return ctx, err + } + + return context.WithValue(ctx, "tx", tx), nil +} +func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Commit() + } + return nil +} +func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Rollback() + } + return nil +} + +func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx + } + return store.DB +} + +func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -26,7 +64,7 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) { return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.DB.Exec(store.SqlInsert, hashToLong(dir), name, dir, meta) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta) if err != nil { return fmt.Errorf("insert %s: %s", entry.FullPath, err) } @@ -38,7 +76,7 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -46,7 +84,7 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) { return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.DB.Exec(store.SqlUpdate, meta, hashToLong(dir), name, dir) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir) if err != nil { return fmt.Errorf("update %s: %s", entry.FullPath, err) } @@ -58,10 +96,10 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) { return nil } -func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entry, error) { +func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) { dir, name := fullpath.DirAndName() - row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir) + row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir) var data []byte if err := row.Scan(&data); err != nil { return nil, filer2.ErrNotFound @@ -77,11 +115,11 @@ func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entr return entry, nil } -func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error { +func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { dir, name := fullpath.DirAndName() - res, err := store.DB.Exec(store.SqlDelete, hashToLong(dir), name, dir) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir) if err != nil { return fmt.Errorf("delete %s: %s", fullpath, err) } @@ -94,14 +132,14 @@ func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error { return nil } -func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { +func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { sqlText := store.SqlListExclusive if inclusive { sqlText = store.SqlListInclusive } - rows, err := store.DB.Query(sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit) + rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit) if err != nil { return nil, fmt.Errorf("list %s : %v", fullpath, err) } diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index 2c1f03182..466be5bf3 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -1,6 +1,7 @@ package cassandra import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -39,7 +40,17 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er return } -func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *CassandraStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *CassandraStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() @@ -56,12 +67,12 @@ func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(entry) + return store.InsertEntry(ctx, entry) } -func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { dir, name := fullpath.DirAndName() var data []byte @@ -74,7 +85,7 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2. } if len(data) == 0 { - return nil, fmt.Errorf("not found: %s", fullpath) + return nil, filer2.ErrNotFound } entry = &filer2.Entry{ @@ -88,7 +99,7 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2. return entry, nil } -func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error { +func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { dir, name := fullpath.DirAndName() @@ -101,7 +112,7 @@ func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error { return nil } -func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index f17a11727..3f8a19114 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -52,9 +52,20 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry { return nil } return &filer_pb.Entry{ - Name: string(entry.FullPath), + Name: entry.FullPath.Name(), IsDirectory: entry.IsDirectory(), Attributes: EntryAttributeToPb(entry), Chunks: entry.Chunks, } } + +func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { + if entry == nil { + return nil + } + dir, _ := entry.FullPath.DirAndName() + return &filer_pb.FullEntry{ + Dir: dir, + Entry: entry.ToProtoEntry(), + } +} diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go index e50b3fa9a..cf4627b74 100644 --- a/weed/filer2/entry_codec.go +++ b/weed/filer2/entry_codec.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/proto" ) func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 6c3157e6c..b5876df82 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -40,7 +40,7 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file fileIds[interval.fileId] = true } for _, chunk := range chunks { - if found := fileIds[chunk.FileId]; found { + if _, found := fileIds[chunk.GetFileIdString()]; found { compacted = append(compacted, chunk) } else { garbage = append(garbage, chunk) @@ -50,15 +50,15 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file return } -func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) { +func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { fileIds := make(map[string]bool) - for _, interval := range newChunks { - fileIds[interval.FileId] = true + for _, interval := range bs { + fileIds[interval.GetFileIdString()] = true } - for _, chunk := range oldChunks { - if found := fileIds[chunk.FileId]; !found { - unused = append(unused, chunk) + for _, chunk := range as { + if _, found := fileIds[chunk.GetFileIdString()]; !found { + delta = append(delta, chunk) } } @@ -123,7 +123,7 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. newV := newVisibleInterval( chunk.Offset, chunk.Offset+int64(chunk.Size), - chunk.FileId, + chunk.GetFileIdString(), chunk.Mtime, true, ) diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 1ee2f5ede..cf236b74d 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -3,6 +3,7 @@ package filer2 import ( "context" "fmt" + "google.golang.org/grpc" "math" "os" "path/filepath" @@ -20,17 +21,19 @@ var ( ) type Filer struct { - store FilerStore + store *FilerStoreWrapper directoryCache *ccache.Cache MasterClient *wdclient.MasterClient fileIdDeletionChan chan string + GrpcDialOption grpc.DialOption } -func NewFiler(masters []string) *Filer { +func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer { f := &Filer{ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), - MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters), + MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters), fileIdDeletionChan: make(chan string, 4096), + GrpcDialOption: grpcDialOption, } go f.loopProcessingDeletion() @@ -39,7 +42,7 @@ func NewFiler(masters []string) *Filer { } func (f *Filer) SetStore(store FilerStore) { - f.store = store + f.store = NewFilerStoreWrapper(store) } func (f *Filer) DisableDirectoryCache() { @@ -54,7 +57,19 @@ func (fs *Filer) KeepConnectedToMaster() { fs.MasterClient.KeepConnectedToMaster() } -func (f *Filer) CreateEntry(entry *Entry) error { +func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) { + return f.store.BeginTransaction(ctx) +} + +func (f *Filer) CommitTransaction(ctx context.Context) error { + return f.store.CommitTransaction(ctx) +} + +func (f *Filer) RollbackTransaction(ctx context.Context) error { + return f.store.RollbackTransaction(ctx) +} + +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { if string(entry.FullPath) == "/" { return nil @@ -67,7 +82,7 @@ func (f *Filer) CreateEntry(entry *Entry) error { var lastDirectoryEntry *Entry for i := 1; i < len(dirParts); i++ { - dirPath := "/" + filepath.Join(dirParts[:i]...) + dirPath := "/" + filepath.ToSlash(filepath.Join(dirParts[:i]...)) // fmt.Printf("%d directory: %+v\n", i, dirPath) // first check local cache @@ -76,7 +91,7 @@ func (f *Filer) CreateEntry(entry *Entry) error { // not found, check the store directly if dirEntry == nil { glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ = f.FindEntry(FullPath(dirPath)) + dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath)) } else { glog.V(4).Infof("found cached directory: %s", dirPath) } @@ -99,9 +114,9 @@ func (f *Filer) CreateEntry(entry *Entry) error { } glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) - mkdirErr := f.store.InsertEntry(dirEntry) + mkdirErr := f.store.InsertEntry(ctx, dirEntry) if mkdirErr != nil { - if _, err := f.FindEntry(FullPath(dirPath)); err == ErrNotFound { + if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound { return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } } else { @@ -134,14 +149,16 @@ func (f *Filer) CreateEntry(entry *Entry) error { } */ - oldEntry, _ := f.FindEntry(entry.FullPath) + oldEntry, _ := f.FindEntry(ctx, entry.FullPath) if oldEntry == nil { - if err := f.store.InsertEntry(entry); err != nil { + if err := f.store.InsertEntry(ctx, entry); err != nil { + glog.Errorf("insert entry %s: %v", entry.FullPath, err) return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) } } else { - if err := f.UpdateEntry(oldEntry, entry); err != nil { + if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil { + glog.Errorf("update entry %s: %v", entry.FullPath, err) return fmt.Errorf("update entry %s: %v", entry.FullPath, err) } } @@ -153,19 +170,21 @@ func (f *Filer) CreateEntry(entry *Entry) error { return nil } -func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) { +func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { if oldEntry != nil { if oldEntry.IsDirectory() && !entry.IsDirectory() { + glog.Errorf("existing %s is a directory", entry.FullPath) return fmt.Errorf("existing %s is a directory", entry.FullPath) } if !oldEntry.IsDirectory() && entry.IsDirectory() { + glog.Errorf("existing %s is a file", entry.FullPath) return fmt.Errorf("existing %s is a file", entry.FullPath) } } - return f.store.UpdateEntry(entry) + return f.store.UpdateEntry(ctx, entry) } -func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) { +func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) { now := time.Now() @@ -181,11 +200,11 @@ func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) { }, }, nil } - return f.store.FindEntry(p) + return f.store.FindEntry(ctx, p) } -func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) { - entry, err := f.FindEntry(p) +func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) { + entry, err := f.FindEntry(ctx, p) if err != nil { return err } @@ -198,37 +217,41 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet lastFileName := "" includeLastFile := false for limit > 0 { - entries, err := f.ListDirectoryEntries(p, lastFileName, includeLastFile, 1024) + entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024) if err != nil { + glog.Errorf("list folder %s: %v", p, err) return fmt.Errorf("list folder %s: %v", p, err) } + if len(entries) == 0 { break - } else { - if isRecursive { - for _, sub := range entries { - lastFileName = sub.Name() - f.DeleteEntryMetaAndData(sub.FullPath, isRecursive, shouldDeleteChunks) - limit-- - if limit <= 0 { - break - } + } + + if isRecursive { + for _, sub := range entries { + lastFileName = sub.Name() + err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, shouldDeleteChunks) + if err != nil { + return err } - } else { - if len(entries) > 0 { - return fmt.Errorf("folder %s is not empty", p) + limit-- + if limit <= 0 { + break } } - f.cacheDelDirectory(string(p)) - if len(entries) < 1024 { - break - } + } + + if len(entries) < 1024 { + break } } + + f.cacheDelDirectory(string(p)) + } if shouldDeleteChunks { - f.DeleteChunks(entry.Chunks) + f.DeleteChunks(p, entry.Chunks) } if p == "/" { @@ -238,17 +261,22 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks) - return f.store.DeleteEntry(p) + return f.store.DeleteEntry(ctx, p) } -func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { +func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } - return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit) + return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) } func (f *Filer) cacheDelDirectory(dirpath string) { + + if dirpath == "/" { + return + } + if f.directoryCache == nil { return } @@ -257,6 +285,7 @@ func (f *Filer) cacheDelDirectory(dirpath string) { } func (f *Filer) cacheGetDirectory(dirpath string) *Entry { + if f.directoryCache == nil { return nil } diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go new file mode 100644 index 000000000..7e093eea2 --- /dev/null +++ b/weed/filer2/filer_client_util.go @@ -0,0 +1,163 @@ +package filer2 + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func VolumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} + +type FilerClient interface { + WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error +} + +func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { + var vids []string + for _, chunkView := range chunkViews { + vids = append(vids, VolumeId(chunkView.FileId)) + } + + vid2Locations := make(map[string]*filer_pb.Locations) + + err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + glog.V(4).Infof("read fh lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return err + } + + vid2Locations = resp.LocationsMap + + return nil + }) + + if err != nil { + return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) + } + + var wg sync.WaitGroup + for _, chunkView := range chunkViews { + wg.Add(1) + go func(chunkView *ChunkView) { + defer wg.Done() + + glog.V(4).Infof("read fh reading chunk: %+v", chunkView) + + locations := vid2Locations[VolumeId(chunkView.FileId)] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", chunkView.FileId) + err = fmt.Errorf("failed to locate %s", chunkView.FileId) + return + } + + var n int64 + n, err = util.ReadUrl( + fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), + chunkView.Offset, + int(chunkView.Size), + buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], + !chunkView.IsFullChunk) + + if err != nil { + + glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) + + err = fmt.Errorf("failed to read http://%s/%s: %v", + locations.Locations[0].Url, chunkView.FileId, err) + return + } + + glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) + totalRead += n + + }(chunkView) + } + wg.Wait() + return +} + +func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) { + + dir, name := FullPath(fullFilePath).DirAndName() + + err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + } + + glog.V(3).Infof("read %s request: %v", fullFilePath, request) + resp, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) { + return nil + } + glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err) + return err + } + + if resp.Entry != nil { + entry = resp.Entry + } + + return nil + }) + + return +} + +func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) { + + err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + paginationLimit := 1024 + + lastEntryName := "" + + for { + + request := &filer_pb.ListEntriesRequest{ + Directory: fullDirPath, + StartFromFileName: lastEntryName, + Limit: uint32(paginationLimit), + } + + glog.V(3).Infof("read directory: %v", request) + resp, err := client.ListEntries(ctx, request) + if err != nil { + return fmt.Errorf("list %s: %v", fullDirPath, err) + } + + for _, entry := range resp.Entries { + fn(entry) + lastEntryName = entry.Name + } + + if len(resp.Entries) < paginationLimit { + break + } + + } + + return nil + + }) + + return +} diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 8fe8ae04f..fea93d57f 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -38,25 +38,28 @@ func (f *Filer) loopProcessingDeletion() { fileIds = append(fileIds, fid) if len(fileIds) >= 4096 { glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) fileIds = fileIds[:0] } case <-ticker.C: if len(fileIds) > 0 { glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) fileIds = fileIds[:0] } } } } -func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { +func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - f.fileIdDeletionChan <- chunk.FileId + glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String()) + f.fileIdDeletionChan <- chunk.GetFileIdString() } } +// DeleteFileByFileId direct delete by file id. +// Only used when the fileId is not being managed by snapshots. func (f *Filer) DeleteFileByFileId(fileId string) { f.fileIdDeletionChan <- fileId } @@ -67,22 +70,19 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { return } if newEntry == nil { - f.DeleteChunks(oldEntry.Chunks) + f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks) } var toDelete []*filer_pb.FileChunk + newChunkIds := make(map[string]bool) + for _, newChunk := range newEntry.Chunks { + newChunkIds[newChunk.GetFileIdString()] = true + } for _, oldChunk := range oldEntry.Chunks { - found := false - for _, newChunk := range newEntry.Chunks { - if oldChunk.FileId == newChunk.FileId { - found = true - break - } - } - if !found { + if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { toDelete = append(toDelete, oldChunk) } } - f.DeleteChunks(toDelete) + f.DeleteChunks(oldEntry.FullPath, toDelete) } diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index b3c215249..c37381116 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -20,12 +20,18 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool) glog.V(3).Infof("notifying entry update %v", key) + newParentPath := "" + if newEntry != nil { + newParentPath, _ = newEntry.FullPath.DirAndName() + } + notification.Queue.SendMessage( key, &filer_pb.EventNotification{ - OldEntry: oldEntry.ToProtoEntry(), - NewEntry: newEntry.ToProtoEntry(), - DeleteChunks: deleteChunks, + OldEntry: oldEntry.ToProtoEntry(), + NewEntry: newEntry.ToProtoEntry(), + DeleteChunks: deleteChunks, + NewParentPath: newParentPath, }, ) diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index 9ef1d9d48..231c7fc68 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -1,7 +1,12 @@ package filer2 import ( + "context" "errors" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -10,12 +15,110 @@ type FilerStore interface { GetName() string // Initialize initializes the file store Initialize(configuration util.Configuration) error - InsertEntry(*Entry) error - UpdateEntry(*Entry) (err error) + InsertEntry(context.Context, *Entry) error + UpdateEntry(context.Context, *Entry) (err error) // err == filer2.ErrNotFound if not found - FindEntry(FullPath) (entry *Entry, err error) - DeleteEntry(FullPath) (err error) - ListDirectoryEntries(dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) + FindEntry(context.Context, FullPath) (entry *Entry, err error) + DeleteEntry(context.Context, FullPath) (err error) + ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) + + BeginTransaction(ctx context.Context) (context.Context, error) + CommitTransaction(ctx context.Context) error + RollbackTransaction(ctx context.Context) error } var ErrNotFound = errors.New("filer: no entry is found in filer store") + +type FilerStoreWrapper struct { + actualStore FilerStore +} + +func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { + return &FilerStoreWrapper{ + actualStore: store, + } +} + +func (fsw *FilerStoreWrapper) GetName() string { + return fsw.actualStore.GetName() +} + +func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error { + return fsw.actualStore.Initialize(configuration) +} + +func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { + stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + return fsw.actualStore.InsertEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { + stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + return fsw.actualStore.UpdateEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) { + stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds()) + }() + + entry, err = fsw.actualStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + filer_pb.AfterEntryDeserialization(entry.Chunks) + return +} + +func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) { + stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + return fsw.actualStore.DeleteEntry(ctx, fp) +} + +func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { + stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) + }() + + entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, err +} + +func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { + return fsw.actualStore.BeginTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { + return fsw.actualStore.CommitTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { + return fsw.actualStore.RollbackTransaction(ctx) +} diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go index be6e34431..191e51cf3 100644 --- a/weed/filer2/fullpath.go +++ b/weed/filer2/fullpath.go @@ -8,10 +8,7 @@ import ( type FullPath string func NewFullPath(dir, name string) FullPath { - if strings.HasSuffix(dir, "/") { - return FullPath(dir + name) - } - return FullPath(dir + "/" + name) + return FullPath(dir).Child(name) } func (fp FullPath) DirAndName() (string, string) { @@ -29,3 +26,11 @@ func (fp FullPath) Name() string { _, name := filepath.Split(string(fp)) return name } + +func (fp FullPath) Child(name string) FullPath { + dir := string(fp) + if strings.HasSuffix(dir, "/") { + return FullPath(dir + name) + } + return FullPath(dir + "/" + name) +} diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index 179107e2c..d00eba859 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -2,12 +2,14 @@ package leveldb import ( "bytes" + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" weed_util "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" ) @@ -38,14 +40,30 @@ func (store *LevelDBStore) initialize(dir string) (err error) { return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) } - if store.db, err = leveldb.OpenFile(dir, nil); err != nil { + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, + } + + if store.db, err = leveldb.OpenFile(dir, opts); err != nil { glog.Infof("filer store open dir %s: %v", dir, err) return } return } -func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *LevelDBStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { key := genKey(entry.DirAndName()) value, err := entry.EncodeAttributesAndChunks() @@ -64,12 +82,12 @@ func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *LevelDBStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(entry) + return store.InsertEntry(ctx, entry) } -func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { key := genKey(fullpath.DirAndName()) data, err := store.db.Get(key, nil) @@ -94,7 +112,7 @@ func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.En return entry, nil } -func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) { +func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { key := genKey(fullpath.DirAndName()) err = store.db.Delete(key, nil) @@ -105,7 +123,7 @@ func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) { return nil } -func (store *LevelDBStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 5b214558f..904de8c97 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -1,6 +1,7 @@ package leveldb import ( + "context" "github.com/chrislusf/seaweedfs/weed/filer2" "io/ioutil" "os" @@ -8,7 +9,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil) + filer := filer2.NewFiler(nil, nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDBStore{} @@ -18,6 +19,8 @@ func TestCreateAndFind(t *testing.T) { fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") + ctx := context.Background() + entry1 := &filer2.Entry{ FullPath: fullpath, Attr: filer2.Attr{ @@ -27,12 +30,12 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := filer.CreateEntry(entry1); err != nil { + if err := filer.CreateEntry(ctx, entry1); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } - entry, err := filer.FindEntry(fullpath) + entry, err := filer.FindEntry(ctx, fullpath) if err != nil { t.Errorf("find entry: %v", err) @@ -45,14 +48,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -61,7 +64,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - filer := filer2.NewFiler(nil) + filer := filer2.NewFiler(nil, nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") defer os.RemoveAll(dir) store := &LevelDBStore{} @@ -69,8 +72,10 @@ func TestEmptyRoot(t *testing.T) { filer.SetStore(store) filer.DisableDirectoryCache() + ctx := context.Background() + // checking one upper directory - entries, err := filer.ListDirectoryEntries(filer2.FullPath("/"), "", false, 100) + entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go new file mode 100644 index 000000000..bce81e357 --- /dev/null +++ b/weed/filer2/leveldb2/leveldb2_store.go @@ -0,0 +1,208 @@ +package leveldb + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + weed_util "github.com/chrislusf/seaweedfs/weed/util" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + leveldb_util "github.com/syndtr/goleveldb/leveldb/util" +) + +func init() { + filer2.Stores = append(filer2.Stores, &LevelDB2Store{}) +} + +type LevelDB2Store struct { + dbs []*leveldb.DB + dbCount int +} + +func (store *LevelDB2Store) GetName() string { + return "leveldb2" +} + +func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) { + dir := configuration.GetString("dir") + return store.initialize(dir, 8) +} + +func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { + glog.Infof("filer store leveldb2 dir: %s", dir) + if err := weed_util.TestFolderWritable(dir); err != nil { + return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) + } + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 4, + } + + for d := 0 ; d < dbCount; d++ { + dbFolder := fmt.Sprintf("%s/%02d", dir, d) + os.MkdirAll(dbFolder, 0755) + db, dbErr := leveldb.OpenFile(dbFolder, opts) + if dbErr != nil { + glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr) + return + } + store.dbs = append(store.dbs, db) + } + store.dbCount = dbCount + + return +} + +func (store *LevelDB2Store) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *LevelDB2Store) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *LevelDB2Store) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { + dir, name := entry.DirAndName() + key, partitionId := genKey(dir, name, store.dbCount) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + err = store.dbs[partitionId].Put(key, value, nil) + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + // println("saved", entry.FullPath, "chunks", len(entry.Chunks)) + + return nil +} + +func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { + dir, name := fullpath.DirAndName() + key, partitionId := genKey(dir, name, store.dbCount) + + data, err := store.dbs[partitionId].Get(key, nil) + + if err == leveldb.ErrNotFound { + return nil, filer2.ErrNotFound + } + if err != nil { + return nil, fmt.Errorf("get %s : %v", entry.FullPath, err) + } + + entry = &filer2.Entry{ + FullPath: fullpath, + } + err = entry.DecodeAttributesAndChunks(data) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data)) + + return entry, nil +} + +func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { + dir, name := fullpath.DirAndName() + key, partitionId := genKey(dir, name, store.dbCount) + + err = store.dbs[partitionId].Delete(key, nil) + if err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, + limit int) (entries []*filer2.Entry, err error) { + + directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) + lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount) + + iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil) + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + fileName := getNameFromKey(key) + if fileName == "" { + continue + } + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entry := &filer2.Entry{ + FullPath: filer2.NewFullPath(string(fullpath), fileName), + } + + // println("list", entry.FullPath, "chunks", len(entry.Chunks)) + + if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + iter.Release() + + return entries, err +} + +func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) { + key, partitionId = hashToBytes(dirPath, dbCount) + key = append(key, []byte(fileName)...) + return key, partitionId +} + +func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { + keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix, partitionId +} + +func getNameFromKey(key []byte) string { + + return string(key[md5.Size:]) + +} + +// hash directory, and use last byte for partitioning +func hashToBytes(dir string, dbCount int) ([]byte, int) { + h := md5.New() + io.WriteString(h, dir) + + b := h.Sum(nil) + + x := b[len(b)-1] + + return b, int(x)%dbCount +} diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go new file mode 100644 index 000000000..a16803ca1 --- /dev/null +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -0,0 +1,88 @@ +package leveldb + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer2" + "io/ioutil" + "os" + "testing" +) + +func TestCreateAndFind(t *testing.T) { + filer := filer2.NewFiler(nil, nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") + defer os.RemoveAll(dir) + store := &LevelDB2Store{} + store.initialize(dir,2) + filer.SetStore(store) + filer.DisableDirectoryCache() + + fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") + + ctx := context.Background() + + entry1 := &filer2.Entry{ + FullPath: fullpath, + Attr: filer2.Attr{ + Mode: 0440, + Uid: 1234, + Gid: 5678, + }, + } + + if err := filer.CreateEntry(ctx, entry1); err != nil { + t.Errorf("create entry %v: %v", entry1.FullPath, err) + return + } + + entry, err := filer.FindEntry(ctx, fullpath) + + if err != nil { + t.Errorf("find entry: %v", err) + return + } + + if entry.FullPath != entry1.FullPath { + t.Errorf("find wrong entry: %v", entry.FullPath) + return + } + + // checking one upper directory + entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + + // checking one upper directory + entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + if len(entries) != 1 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} + +func TestEmptyRoot(t *testing.T) { + filer := filer2.NewFiler(nil, nil) + dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2") + defer os.RemoveAll(dir) + store := &LevelDB2Store{} + store.initialize(dir,2) + filer.SetStore(store) + filer.DisableDirectoryCache() + + ctx := context.Background() + + // checking one upper directory + entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + if err != nil { + t.Errorf("list entries: %v", err) + return + } + if len(entries) != 0 { + t.Errorf("list entries count: %v", len(entries)) + return + } + +} diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go index 062f1cd1c..9c10a5472 100644 --- a/weed/filer2/memdb/memdb_store.go +++ b/weed/filer2/memdb/memdb_store.go @@ -1,11 +1,13 @@ package memdb import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/util" "github.com/google/btree" "strings" + "sync" ) func init() { @@ -13,7 +15,8 @@ func init() { } type MemDbStore struct { - tree *btree.BTree + tree *btree.BTree + treeLock sync.Mutex } type entryItem struct { @@ -33,21 +36,35 @@ func (store *MemDbStore) Initialize(configuration util.Configuration) (err error return nil } -func (store *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *MemDbStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *MemDbStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { // println("inserting", entry.FullPath) + store.treeLock.Lock() store.tree.ReplaceOrInsert(entryItem{entry}) + store.treeLock.Unlock() return nil } -func (store *MemDbStore) UpdateEntry(entry *filer2.Entry) (err error) { - if _, err = store.FindEntry(entry.FullPath); err != nil { +func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { + if _, err = store.FindEntry(ctx, entry.FullPath); err != nil { return fmt.Errorf("no such file %s : %v", entry.FullPath, err) } + store.treeLock.Lock() store.tree.ReplaceOrInsert(entryItem{entry}) + store.treeLock.Unlock() return nil } -func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}}) if item == nil { return nil, filer2.ErrNotFound @@ -56,12 +73,14 @@ func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entr return entry, nil } -func (store *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (err error) { +func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { + store.treeLock.Lock() store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}}) + store.treeLock.Unlock() return nil } -func (store *MemDbStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { +func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { startFrom := string(fullpath) if startFileName != "" { diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go index cf813e04b..d823c5177 100644 --- a/weed/filer2/memdb/memdb_store_test.go +++ b/weed/filer2/memdb/memdb_store_test.go @@ -1,17 +1,20 @@ package memdb import ( + "context" "github.com/chrislusf/seaweedfs/weed/filer2" "testing" ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler(nil) + filer := filer2.NewFiler(nil, nil) store := &MemDbStore{} store.Initialize(nil) filer.SetStore(store) filer.DisableDirectoryCache() + ctx := context.Background() + fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") entry1 := &filer2.Entry{ @@ -23,12 +26,12 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := filer.CreateEntry(entry1); err != nil { + if err := filer.CreateEntry(ctx, entry1); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } - entry, err := filer.FindEntry(fullpath) + entry, err := filer.FindEntry(ctx, fullpath) if err != nil { t.Errorf("find entry: %v", err) @@ -43,12 +46,14 @@ func TestCreateAndFind(t *testing.T) { } func TestCreateFileAndList(t *testing.T) { - filer := filer2.NewFiler(nil) + filer := filer2.NewFiler(nil, nil) store := &MemDbStore{} store.Initialize(nil) filer.SetStore(store) filer.DisableDirectoryCache() + ctx := context.Background() + entry1 := &filer2.Entry{ FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"), Attr: filer2.Attr{ @@ -67,11 +72,11 @@ func TestCreateFileAndList(t *testing.T) { }, } - filer.CreateEntry(entry1) - filer.CreateEntry(entry2) + filer.CreateEntry(ctx, entry1) + filer.CreateEntry(ctx, entry2) // checking the 2 files - entries, err := filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is/one/"), "", false, 100) + entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) @@ -94,21 +99,21 @@ func TestCreateFileAndList(t *testing.T) { } // checking the offset - entries, err = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100) + entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking root directory - entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -124,18 +129,18 @@ func TestCreateFileAndList(t *testing.T) { Gid: 5678, }, } - filer.CreateEntry(entry3) + filer.CreateEntry(ctx, entry3) // checking one upper directory - entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) if len(entries) != 2 { t.Errorf("list entries count: %v", len(entries)) return } // delete file and count - filer.DeleteEntryMetaAndData(file3Path, false, false) - entries, _ = filer.ListDirectoryEntries(filer2.FullPath("/home/chris/this/is"), "", false, 100) + filer.DeleteEntryMetaAndData(ctx, file3Path, false, false) + entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return diff --git a/weed/filer2/postgres/README.txt b/weed/filer2/postgres/README.txt index ef2ef683b..cb0c99c63 100644 --- a/weed/filer2/postgres/README.txt +++ b/weed/filer2/postgres/README.txt @@ -9,8 +9,8 @@ $PGHOME/bin/psql --username=postgres --password seaweedfs CREATE TABLE IF NOT EXISTS filemeta ( dirhash BIGINT, - name VARCHAR(1000), - directory VARCHAR(4096), + name VARCHAR(65535), + directory VARCHAR(65535), meta bytea, PRIMARY KEY (dirhash, name) ); diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go index 4f74a8a22..11c315391 100644 --- a/weed/filer2/redis/redis_cluster_store.go +++ b/weed/filer2/redis/redis_cluster_store.go @@ -21,12 +21,14 @@ func (store *RedisClusterStore) GetName() string { func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) { return store.initialize( configuration.GetStringSlice("addresses"), + configuration.GetString("password"), ) } -func (store *RedisClusterStore) initialize(addresses []string) (err error) { +func (store *RedisClusterStore) initialize(addresses []string, password string) (err error) { store.Client = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: addresses, + Addrs: addresses, + Password: password, }) return } diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go index 7fd7e1180..ce41d4d70 100644 --- a/weed/filer2/redis/universal_redis_store.go +++ b/weed/filer2/redis/universal_redis_store.go @@ -1,6 +1,7 @@ package redis import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -18,7 +19,17 @@ type UniversalRedisStore struct { Client redis.UniversalClient } -func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) { +func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { value, err := entry.EncodeAttributesAndChunks() if err != nil { @@ -42,12 +53,12 @@ func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) { return nil } -func (store *UniversalRedisStore) UpdateEntry(entry *filer2.Entry) (err error) { +func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { - return store.InsertEntry(entry) + return store.InsertEntry(ctx, entry) } -func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { data, err := store.Client.Get(string(fullpath)).Result() if err == redis.Nil { @@ -69,7 +80,7 @@ func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *fi return entry, nil } -func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err error) { +func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { _, err = store.Client.Del(string(fullpath)).Result() @@ -88,7 +99,7 @@ func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err err return nil } -func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() @@ -126,7 +137,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath, // fetch entry meta for _, fileName := range members { path := filer2.NewFullPath(string(fullpath), fileName) - entry, err := store.FindEntry(path) + entry, err := store.FindEntry(ctx, path) if err != nil { glog.V(0).Infof("list %s : %v", path, err) } else { diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go new file mode 100644 index 000000000..01b87cad1 --- /dev/null +++ b/weed/filer2/stream.go @@ -0,0 +1,41 @@ +package filer2 + +import ( + "io" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error { + + chunkViews := ViewFromChunks(chunks, offset, size) + + fileId2Url := make(map[string]string) + + for _, chunkView := range chunkViews { + + urlString, err := masterClient.LookupFileId(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return err + } + fileId2Url[chunkView.FileId] = urlString + } + + for _, chunkView := range chunkViews { + urlString := fileId2Url[chunkView.FileId] + _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { + w.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return err + } + } + + return nil + +} |
