diff options
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/abstract_sql/abstract_sql_store.go | 10 | ||||
| -rw-r--r-- | weed/filer2/cassandra/cassandra_store.go | 10 | ||||
| -rw-r--r-- | weed/filer2/entry.go | 3 | ||||
| -rw-r--r-- | weed/filer2/etcd/etcd_store.go | 12 | ||||
| -rw-r--r-- | weed/filer2/filer.go | 12 | ||||
| -rw-r--r-- | weed/filer2/filer_buckets.go | 3 | ||||
| -rw-r--r-- | weed/filer2/filer_client_util.go | 103 | ||||
| -rw-r--r-- | weed/filer2/filer_delete_entry.go | 3 | ||||
| -rw-r--r-- | weed/filer2/filer_notify_test.go | 4 | ||||
| -rw-r--r-- | weed/filer2/filerstore.go | 16 | ||||
| -rw-r--r-- | weed/filer2/fullpath.go | 42 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store.go | 12 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store_test.go | 12 | ||||
| -rw-r--r-- | weed/filer2/leveldb2/leveldb2_store.go | 12 | ||||
| -rw-r--r-- | weed/filer2/leveldb2/leveldb2_store_test.go | 12 | ||||
| -rw-r--r-- | weed/filer2/redis/universal_redis_store.go | 13 | ||||
| -rw-r--r-- | weed/filer2/stream.go | 15 |
17 files changed, 80 insertions, 214 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index ff041d0a3..5ade18960 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -99,7 +99,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En return nil } -func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) { +func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer2.Entry, error) { dir, name := fullpath.DirAndName() row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir) @@ -118,7 +118,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.Fu return entry, nil } -func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { +func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { dir, name := fullpath.DirAndName() @@ -135,7 +135,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2. return nil } -func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { +func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath) if err != nil { @@ -150,7 +150,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat return nil } -func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { +func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { sqlText := store.SqlListExclusive if inclusive { @@ -172,7 +172,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), name), + FullPath: util.NewFullPath(string(fullpath), name), } if err = entry.DecodeAttributesAndChunks(data); err != nil { glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index d57df23eb..5dd7d8036 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -75,7 +75,7 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entr return store.InsertEntry(ctx, entry) } -func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { dir, name := fullpath.DirAndName() var data []byte @@ -102,7 +102,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full return entry, nil } -func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error { +func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { dir, name := fullpath.DirAndName() @@ -115,7 +115,7 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu return nil } -func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error { +func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { if err := store.session.Query( "DELETE FROM filemeta WHERE directory=?", @@ -126,7 +126,7 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath return nil } -func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" @@ -139,7 +139,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter() for iter.Scan(&name, &data) { entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), name), + FullPath: util.NewFullPath(string(fullpath), name), } if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { err = decodeErr diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index c901927bb..ef6c8f9a6 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -5,6 +5,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) type Attr struct { @@ -27,7 +28,7 @@ func (attr Attr) IsDirectory() bool { } type Entry struct { - FullPath + util.FullPath Attr Extended map[string][]byte diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go index 6c352c8d0..2ef65b4a0 100644 --- a/weed/filer2/etcd/etcd_store.go +++ b/weed/filer2/etcd/etcd_store.go @@ -92,7 +92,7 @@ func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (e return store.InsertEntry(ctx, entry) } -func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { key := genKey(fullpath.DirAndName()) resp, err := store.client.Get(ctx, string(key)) @@ -115,7 +115,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) return entry, nil } -func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { key := genKey(fullpath.DirAndName()) if _, err := store.client.Delete(ctx, string(key)); err != nil { @@ -125,7 +125,7 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat return nil } -func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil { @@ -136,7 +136,7 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer } func (store *EtcdStore) ListDirectoryEntries( - ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int, + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, ) (entries []*filer2.Entry, err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") @@ -159,7 +159,7 @@ func (store *EtcdStore) ListDirectoryEntries( break } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), + FullPath: weed_util.NewFullPath(string(fullpath), fileName), } if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { err = decodeErr @@ -179,7 +179,7 @@ func genKey(dirPath, fileName string) (key []byte) { return key } -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { keyPrefix = []byte(string(fullpath)) keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) if len(startFileName) > 0 { diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index e226552ad..0fdd4cf32 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -100,7 +100,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro // not found, check the store directly if dirEntry == nil { glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath)) + dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath)) } else { // glog.V(4).Infof("found cached directory: %s", dirPath) } @@ -112,7 +112,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro now := time.Now() dirEntry = &Entry{ - FullPath: FullPath(dirPath), + FullPath: util.FullPath(dirPath), Attr: Attr{ Mtime: now, Crtime: now, @@ -127,7 +127,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) mkdirErr := f.store.InsertEntry(ctx, dirEntry) if mkdirErr != nil { - if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == filer_pb.ErrNotFound { + if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } @@ -207,7 +207,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er return f.store.UpdateEntry(ctx, entry) } -func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) { +func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) { now := time.Now() @@ -234,7 +234,7 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er } -func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { +func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } @@ -251,7 +251,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileN return entries, err } -func (f *Filer) doListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) { +func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) { listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit) if listErr != nil { return listedEntries, expiredCount, "", listErr diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go index 601b7dbf3..3fc4afdab 100644 --- a/weed/filer2/filer_buckets.go +++ b/weed/filer2/filer_buckets.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) type BucketName string @@ -28,7 +29,7 @@ func (f *Filer) LoadBuckets(dirBucketsPath string) { limit := math.MaxInt32 - entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit) + entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit) if err != nil { glog.V(1).Infof("no buckets found: %v", err) diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go deleted file mode 100644 index 60b4dec18..000000000 --- a/weed/filer2/filer_client_util.go +++ /dev/null @@ -1,103 +0,0 @@ -package filer2 - -import ( - "context" - "fmt" - "io" - "math" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -func VolumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} - -type FilerClient interface { - WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error - AdjustedUrl(hostAndPort string) string -} - -func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) { - - dir, name := fullFilePath.DirAndName() - - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir, - Name: name, - } - - // glog.V(3).Infof("read %s request: %v", fullFilePath, request) - resp, err := filer_pb.LookupEntry(client, request) - if err != nil { - if err == filer_pb.ErrNotFound { - return nil - } - glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err) - return err - } - - if resp.Entry == nil { - // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry) - return nil - } - - entry = resp.Entry - return nil - }) - - return -} - -func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) { - - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - lastEntryName := "" - - request := &filer_pb.ListEntriesRequest{ - Directory: string(fullDirPath), - Prefix: prefix, - StartFromFileName: lastEntryName, - Limit: math.MaxUint32, - } - - glog.V(3).Infof("read directory: %v", request) - stream, err := client.ListEntries(context.Background(), request) - if err != nil { - return fmt.Errorf("list %s: %v", fullDirPath, err) - } - - var prevEntry *filer_pb.Entry - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - if prevEntry != nil { - fn(prevEntry, true) - } - break - } else { - return recvErr - } - } - if prevEntry != nil { - fn(prevEntry, false) - } - prevEntry = resp.Entry - } - - return nil - - }) - - return -} diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go index d0792ac66..e90c97c12 100644 --- a/weed/filer2/filer_delete_entry.go +++ b/weed/filer2/filer_delete_entry.go @@ -7,9 +7,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) -func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { +func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) { if p == "/" { return nil } diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go index b74e2ad35..29170bfdf 100644 --- a/weed/filer2/filer_notify_test.go +++ b/weed/filer2/filer_notify_test.go @@ -5,13 +5,15 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" ) func TestProtoMarshalText(t *testing.T) { oldEntry := &Entry{ - FullPath: FullPath("/this/path/to"), + FullPath: util.FullPath("/this/path/to"), Attr: Attr{ Mtime: time.Now(), Mode: 0644, diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index e3476aa96..f36c74f14 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -17,10 +17,10 @@ type FilerStore interface { InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) // err == filer2.ErrNotFound if not found - FindEntry(context.Context, FullPath) (entry *Entry, err error) - DeleteEntry(context.Context, FullPath) (err error) - DeleteFolderChildren(context.Context, FullPath) (err error) - ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) + FindEntry(context.Context, util.FullPath) (entry *Entry, err error) + DeleteEntry(context.Context, util.FullPath) (err error) + DeleteFolderChildren(context.Context, util.FullPath) (err error) + ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) BeginTransaction(ctx context.Context) (context.Context, error) CommitTransaction(ctx context.Context) error @@ -72,7 +72,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err return fsw.actualStore.UpdateEntry(ctx, entry) } -func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) { +func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc() start := time.Now() defer func() { @@ -87,7 +87,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry return } -func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) { +func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc() start := time.Now() defer func() { @@ -97,7 +97,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err return fsw.actualStore.DeleteEntry(ctx, fp) } -func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) { +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc() start := time.Now() defer func() { @@ -107,7 +107,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullP return fsw.actualStore.DeleteFolderChildren(ctx, fp) } -func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { +func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc() start := time.Now() defer func() { diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go deleted file mode 100644 index 133069f93..000000000 --- a/weed/filer2/fullpath.go +++ /dev/null @@ -1,42 +0,0 @@ -package filer2 - -import ( - "path/filepath" - "strings" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type FullPath string - -func NewFullPath(dir, name string) FullPath { - return FullPath(dir).Child(name) -} - -func (fp FullPath) DirAndName() (string, string) { - dir, name := filepath.Split(string(fp)) - if dir == "/" { - return dir, name - } - if len(dir) < 1 { - return "/", "" - } - return dir[:len(dir)-1], name -} - -func (fp FullPath) Name() string { - _, name := filepath.Split(string(fp)) - return name -} - -func (fp FullPath) Child(name string) FullPath { - dir := string(fp) - if strings.HasSuffix(dir, "/") { - return FullPath(dir + name) - } - return FullPath(dir + "/" + name) -} - -func (fp FullPath) AsInode() uint64 { - return uint64(util.HashStringToLong(string(fp))) -} diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index 9ddb9bacb..f8e56d93c 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -89,7 +89,7 @@ func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) return store.InsertEntry(ctx, entry) } -func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { key := genKey(fullpath.DirAndName()) data, err := store.db.Get(key, nil) @@ -114,7 +114,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPa return entry, nil } -func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { key := genKey(fullpath.DirAndName()) err = store.db.Delete(key, nil) @@ -125,7 +125,7 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full return nil } -func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { batch := new(leveldb.Batch) @@ -153,7 +153,7 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath fi return nil } -func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { directoryPrefix := genDirectoryKeyPrefix(fullpath, "") @@ -176,7 +176,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath fi break } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), + FullPath: weed_util.NewFullPath(string(fullpath), fileName), } if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { err = decodeErr @@ -197,7 +197,7 @@ func genKey(dirPath, fileName string) (key []byte) { return key } -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) { +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) { keyPrefix = []byte(string(fullpath)) keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR) if len(startFileName) > 0 { diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 497158420..db291a8dc 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -2,10 +2,12 @@ package leveldb import ( "context" - "github.com/chrislusf/seaweedfs/weed/filer2" "io/ioutil" "os" "testing" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" ) func TestCreateAndFind(t *testing.T) { @@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) { filer.SetStore(store) filer.DisableDirectoryCache() - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") + fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") ctx := context.Background() @@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go index 1e6827356..61fd2e9e6 100644 --- a/weed/filer2/leveldb2/leveldb2_store.go +++ b/weed/filer2/leveldb2/leveldb2_store.go @@ -98,7 +98,7 @@ func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry return store.InsertEntry(ctx, entry) } -func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) { dir, name := fullpath.DirAndName() key, partitionId := genKey(dir, name, store.dbCount) @@ -124,7 +124,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullP return entry, nil } -func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { dir, name := fullpath.DirAndName() key, partitionId := genKey(dir, name, store.dbCount) @@ -136,7 +136,7 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful return nil } -func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) batch := new(leveldb.Batch) @@ -164,7 +164,7 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath f return nil } -func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount) @@ -188,7 +188,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath f break } entry := &filer2.Entry{ - FullPath: filer2.NewFullPath(string(fullpath), fileName), + FullPath: weed_util.NewFullPath(string(fullpath), fileName), } // println("list", entry.FullPath, "chunks", len(entry.Chunks)) @@ -211,7 +211,7 @@ func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) return key, partitionId } -func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { +func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) { keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount) if len(startFileName) > 0 { keyPrefix = append(keyPrefix, []byte(startFileName)...) diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index dc94f2ac7..1fe76f8ee 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -2,10 +2,12 @@ package leveldb import ( "context" - "github.com/chrislusf/seaweedfs/weed/filer2" "io/ioutil" "os" "testing" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" ) func TestCreateAndFind(t *testing.T) { @@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) { filer.SetStore(store) filer.DisableDirectoryCache() - fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg") + fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") ctx := context.Background() @@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100) + entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100) + entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100) if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go index c9f59d37b..e5b9e8840 100644 --- a/weed/filer2/redis/universal_redis_store.go +++ b/weed/filer2/redis/universal_redis_store.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) const ( @@ -61,7 +62,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2 return store.InsertEntry(ctx, entry) } -func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) { +func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) { data, err := store.Client.Get(string(fullpath)).Result() if err == redis.Nil { @@ -83,7 +84,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2 return entry, nil } -func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { _, err = store.Client.Del(string(fullpath)).Result() @@ -102,7 +103,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file return nil } -func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) { +func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() if err != nil { @@ -110,7 +111,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full } for _, fileName := range members { - path := filer2.NewFullPath(string(fullpath), fileName) + path := util.NewFullPath(string(fullpath), fileName) _, err = store.Client.Del(string(path)).Result() if err != nil { return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) @@ -120,7 +121,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full return nil } -func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, +func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) { dirListKey := genDirectoryListKey(string(fullpath)) @@ -158,7 +159,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full // fetch entry meta for _, fileName := range members { - path := filer2.NewFullPath(string(fullpath), fileName) + path := util.NewFullPath(string(fullpath), fileName) entry, err := store.FindEntry(ctx, path) if err != nil { glog.V(0).Infof("list %s : %v", path, err) diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 8819070ff..bb24312fd 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -71,13 +71,13 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [ } } -func NewChunkStreamReaderFromClient(filerClient FilerClient, chunkViews []*ChunkView) *ChunkStreamReader { +func NewChunkStreamReaderFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView) *ChunkStreamReader { return &ChunkStreamReader{ chunkViews: chunkViews, lookupFileId: func(fileId string) (targetUrl string, err error) { err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - vid := fileIdToVolumeId(fileId) + vid := VolumeId(fileId) resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) @@ -178,10 +178,11 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return nil } -func fileIdToVolumeId(fileId string) (volumeId string) { - parts := strings.Split(fileId, ",") - if len(parts) != 2 { - return fileId +func VolumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] } - return parts[0] + return fileId } + |
