aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-23 00:01:34 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-23 00:01:34 -0700
commitc0f0fdb3baeb6e9852c6876b23c1404b2c5e833d (patch)
treeb52ffd7ac51e5c0472f0d0e2a8f5b1338cbf270c /weed/filer2
parentfbca6b29bd48eeed54511a9b53b937597eee5d19 (diff)
downloadseaweedfs-c0f0fdb3baeb6e9852c6876b23c1404b2c5e833d.tar.xz
seaweedfs-c0f0fdb3baeb6e9852c6876b23c1404b2c5e833d.zip
refactoring
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go10
-rw-r--r--weed/filer2/cassandra/cassandra_store.go10
-rw-r--r--weed/filer2/entry.go3
-rw-r--r--weed/filer2/etcd/etcd_store.go12
-rw-r--r--weed/filer2/filer.go12
-rw-r--r--weed/filer2/filer_buckets.go3
-rw-r--r--weed/filer2/filer_client_util.go103
-rw-r--r--weed/filer2/filer_delete_entry.go3
-rw-r--r--weed/filer2/filer_notify_test.go4
-rw-r--r--weed/filer2/filerstore.go16
-rw-r--r--weed/filer2/fullpath.go42
-rw-r--r--weed/filer2/leveldb/leveldb_store.go12
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go12
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go12
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go12
-rw-r--r--weed/filer2/redis/universal_redis_store.go13
-rw-r--r--weed/filer2/stream.go15
17 files changed, 80 insertions, 214 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index ff041d0a3..5ade18960 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -99,7 +99,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
return nil
}
-func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
+func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer2.Entry, error) {
dir, name := fullpath.DirAndName()
row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
@@ -118,7 +118,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.Fu
return entry, nil
}
-func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
@@ -135,7 +135,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
return nil
}
-func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
if err != nil {
@@ -150,7 +150,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return nil
}
-func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive
if inclusive {
@@ -172,7 +172,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(fullpath), name),
}
if err = entry.DecodeAttributesAndChunks(data); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index d57df23eb..5dd7d8036 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -75,7 +75,7 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entr
return store.InsertEntry(ctx, entry)
}
-func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
dir, name := fullpath.DirAndName()
var data []byte
@@ -102,7 +102,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.Full
return entry, nil
}
-func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
@@ -115,7 +115,7 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu
return nil
}
-func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=?",
@@ -126,7 +126,7 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
return nil
}
-func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
@@ -139,7 +139,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath
iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
for iter.Scan(&name, &data) {
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(fullpath), name),
}
if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
err = decodeErr
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index c901927bb..ef6c8f9a6 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type Attr struct {
@@ -27,7 +28,7 @@ func (attr Attr) IsDirectory() bool {
}
type Entry struct {
- FullPath
+ util.FullPath
Attr
Extended map[string][]byte
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
index 6c352c8d0..2ef65b4a0 100644
--- a/weed/filer2/etcd/etcd_store.go
+++ b/weed/filer2/etcd/etcd_store.go
@@ -92,7 +92,7 @@ func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (e
return store.InsertEntry(ctx, entry)
}
-func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
key := genKey(fullpath.DirAndName())
resp, err := store.client.Get(ctx, string(key))
@@ -115,7 +115,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath)
return entry, nil
}
-func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
key := genKey(fullpath.DirAndName())
if _, err := store.client.Delete(ctx, string(key)); err != nil {
@@ -125,7 +125,7 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
return nil
}
-func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
@@ -136,7 +136,7 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer
}
func (store *EtcdStore) ListDirectoryEntries(
- ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
@@ -159,7 +159,7 @@ func (store *EtcdStore) ListDirectoryEntries(
break
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
err = decodeErr
@@ -179,7 +179,7 @@ func genKey(dirPath, fileName string) (key []byte) {
return key
}
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
keyPrefix = []byte(string(fullpath))
keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
if len(startFileName) > 0 {
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index e226552ad..0fdd4cf32 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -100,7 +100,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
// not found, check the store directly
if dirEntry == nil {
glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
+ dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath))
} else {
// glog.V(4).Infof("found cached directory: %s", dirPath)
}
@@ -112,7 +112,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
now := time.Now()
dirEntry = &Entry{
- FullPath: FullPath(dirPath),
+ FullPath: util.FullPath(dirPath),
Attr: Attr{
Mtime: now,
Crtime: now,
@@ -127,7 +127,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool) erro
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == filer_pb.ErrNotFound {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
@@ -207,7 +207,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
return f.store.UpdateEntry(ctx, entry)
}
-func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
+func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
now := time.Now()
@@ -234,7 +234,7 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
@@ -251,7 +251,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileN
return entries, err
}
-func (f *Filer) doListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
if listErr != nil {
return listedEntries, expiredCount, "", listErr
diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go
index 601b7dbf3..3fc4afdab 100644
--- a/weed/filer2/filer_buckets.go
+++ b/weed/filer2/filer_buckets.go
@@ -6,6 +6,7 @@ import (
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type BucketName string
@@ -28,7 +29,7 @@ func (f *Filer) LoadBuckets(dirBucketsPath string) {
limit := math.MaxInt32
- entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit)
+ entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit)
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
deleted file mode 100644
index 60b4dec18..000000000
--- a/weed/filer2/filer_client_util.go
+++ /dev/null
@@ -1,103 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "io"
- "math"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func VolumeId(fileId string) string {
- lastCommaIndex := strings.LastIndex(fileId, ",")
- if lastCommaIndex > 0 {
- return fileId[:lastCommaIndex]
- }
- return fileId
-}
-
-type FilerClient interface {
- WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
- AdjustedUrl(hostAndPort string) string
-}
-
-func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
-
- dir, name := fullFilePath.DirAndName()
-
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir,
- Name: name,
- }
-
- // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
- resp, err := filer_pb.LookupEntry(client, request)
- if err != nil {
- if err == filer_pb.ErrNotFound {
- return nil
- }
- glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
- return err
- }
-
- if resp.Entry == nil {
- // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
- return nil
- }
-
- entry = resp.Entry
- return nil
- })
-
- return
-}
-
-func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
-
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- lastEntryName := ""
-
- request := &filer_pb.ListEntriesRequest{
- Directory: string(fullDirPath),
- Prefix: prefix,
- StartFromFileName: lastEntryName,
- Limit: math.MaxUint32,
- }
-
- glog.V(3).Infof("read directory: %v", request)
- stream, err := client.ListEntries(context.Background(), request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
-
- var prevEntry *filer_pb.Entry
- for {
- resp, recvErr := stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- if prevEntry != nil {
- fn(prevEntry, true)
- }
- break
- } else {
- return recvErr
- }
- }
- if prevEntry != nil {
- fn(prevEntry, false)
- }
- prevEntry = resp.Entry
- }
-
- return nil
-
- })
-
- return
-}
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
index d0792ac66..e90c97c12 100644
--- a/weed/filer2/filer_delete_entry.go
+++ b/weed/filer2/filer_delete_entry.go
@@ -7,9 +7,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
-func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
if p == "/" {
return nil
}
diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go
index b74e2ad35..29170bfdf 100644
--- a/weed/filer2/filer_notify_test.go
+++ b/weed/filer2/filer_notify_test.go
@@ -5,13 +5,15 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/golang/protobuf/proto"
)
func TestProtoMarshalText(t *testing.T) {
oldEntry := &Entry{
- FullPath: FullPath("/this/path/to"),
+ FullPath: util.FullPath("/this/path/to"),
Attr: Attr{
Mtime: time.Now(),
Mode: 0644,
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index e3476aa96..f36c74f14 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -17,10 +17,10 @@ type FilerStore interface {
InsertEntry(context.Context, *Entry) error
UpdateEntry(context.Context, *Entry) (err error)
// err == filer2.ErrNotFound if not found
- FindEntry(context.Context, FullPath) (entry *Entry, err error)
- DeleteEntry(context.Context, FullPath) (err error)
- DeleteFolderChildren(context.Context, FullPath) (err error)
- ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+ FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
+ DeleteEntry(context.Context, util.FullPath) (err error)
+ DeleteFolderChildren(context.Context, util.FullPath) (err error)
+ ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
@@ -72,7 +72,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
return fsw.actualStore.UpdateEntry(ctx, entry)
}
-func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
+func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
start := time.Now()
defer func() {
@@ -87,7 +87,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry
return
}
-func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
+func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
@@ -97,7 +97,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err
return fsw.actualStore.DeleteEntry(ctx, fp)
}
-func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now()
defer func() {
@@ -107,7 +107,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullP
return fsw.actualStore.DeleteFolderChildren(ctx, fp)
}
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
start := time.Now()
defer func() {
diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go
deleted file mode 100644
index 133069f93..000000000
--- a/weed/filer2/fullpath.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package filer2
-
-import (
- "path/filepath"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type FullPath string
-
-func NewFullPath(dir, name string) FullPath {
- return FullPath(dir).Child(name)
-}
-
-func (fp FullPath) DirAndName() (string, string) {
- dir, name := filepath.Split(string(fp))
- if dir == "/" {
- return dir, name
- }
- if len(dir) < 1 {
- return "/", ""
- }
- return dir[:len(dir)-1], name
-}
-
-func (fp FullPath) Name() string {
- _, name := filepath.Split(string(fp))
- return name
-}
-
-func (fp FullPath) Child(name string) FullPath {
- dir := string(fp)
- if strings.HasSuffix(dir, "/") {
- return FullPath(dir + name)
- }
- return FullPath(dir + "/" + name)
-}
-
-func (fp FullPath) AsInode() uint64 {
- return uint64(util.HashStringToLong(string(fp)))
-}
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index 9ddb9bacb..f8e56d93c 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -89,7 +89,7 @@ func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry)
return store.InsertEntry(ctx, entry)
}
-func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
key := genKey(fullpath.DirAndName())
data, err := store.db.Get(key, nil)
@@ -114,7 +114,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPa
return entry, nil
}
-func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
key := genKey(fullpath.DirAndName())
err = store.db.Delete(key, nil)
@@ -125,7 +125,7 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full
return nil
}
-func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
batch := new(leveldb.Batch)
@@ -153,7 +153,7 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath fi
return nil
}
-func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
@@ -176,7 +176,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath fi
break
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
err = decodeErr
@@ -197,7 +197,7 @@ func genKey(dirPath, fileName string) (key []byte) {
return key
}
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
keyPrefix = []byte(string(fullpath))
keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
if len(startFileName) > 0 {
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 497158420..db291a8dc 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -2,10 +2,12 @@ package leveldb
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"io/ioutil"
"os"
"testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func TestCreateAndFind(t *testing.T) {
@@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) {
filer.SetStore(store)
filer.DisableDirectoryCache()
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
ctx := context.Background()
@@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
index 1e6827356..61fd2e9e6 100644
--- a/weed/filer2/leveldb2/leveldb2_store.go
+++ b/weed/filer2/leveldb2/leveldb2_store.go
@@ -98,7 +98,7 @@ func (store *LevelDB2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry
return store.InsertEntry(ctx, entry)
}
-func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer2.Entry, err error) {
dir, name := fullpath.DirAndName()
key, partitionId := genKey(dir, name, store.dbCount)
@@ -124,7 +124,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath filer2.FullP
return entry, nil
}
-func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
dir, name := fullpath.DirAndName()
key, partitionId := genKey(dir, name, store.dbCount)
@@ -136,7 +136,7 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful
return nil
}
-func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
batch := new(leveldb.Batch)
@@ -164,7 +164,7 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath f
return nil
}
-func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
@@ -188,7 +188,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath f
break
}
entry := &filer2.Entry{
- FullPath: filer2.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
@@ -211,7 +211,7 @@ func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int)
return key, partitionId
}
-func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string, dbCount int) (keyPrefix []byte, partitionId int) {
keyPrefix, partitionId = hashToBytes(string(fullpath), dbCount)
if len(startFileName) > 0 {
keyPrefix = append(keyPrefix, []byte(startFileName)...)
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index dc94f2ac7..1fe76f8ee 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -2,10 +2,12 @@ package leveldb
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"io/ioutil"
"os"
"testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func TestCreateAndFind(t *testing.T) {
@@ -17,7 +19,7 @@ func TestCreateAndFind(t *testing.T) {
filer.SetStore(store)
filer.DisableDirectoryCache()
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
ctx := context.Background()
@@ -48,14 +50,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ entries, _ := filer.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _ = filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, err := filer.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index c9f59d37b..e5b9e8840 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -61,7 +62,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2
return store.InsertEntry(ctx, entry)
}
-func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result()
if err == redis.Nil {
@@ -83,7 +84,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2
return entry, nil
}
-func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result()
@@ -102,7 +103,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file
return nil
}
-func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
if err != nil {
@@ -110,7 +111,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
}
for _, fileName := range members {
- path := filer2.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(string(path)).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
@@ -120,7 +121,7 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
return nil
}
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
dirListKey := genDirectoryListKey(string(fullpath))
@@ -158,7 +159,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
// fetch entry meta
for _, fileName := range members {
- path := filer2.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(fullpath), fileName)
entry, err := store.FindEntry(ctx, path)
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index 8819070ff..bb24312fd 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -71,13 +71,13 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [
}
}
-func NewChunkStreamReaderFromClient(filerClient FilerClient, chunkViews []*ChunkView) *ChunkStreamReader {
+func NewChunkStreamReaderFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView) *ChunkStreamReader {
return &ChunkStreamReader{
chunkViews: chunkViews,
lookupFileId: func(fileId string) (targetUrl string, err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- vid := fileIdToVolumeId(fileId)
+ vid := VolumeId(fileId)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
@@ -178,10 +178,11 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return nil
}
-func fileIdToVolumeId(fileId string) (volumeId string) {
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return fileId
+func VolumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
}
- return parts[0]
+ return fileId
}
+