aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go11
-rw-r--r--weed/filer2/cassandra/cassandra_store.go13
-rw-r--r--weed/filer2/filer.go32
-rw-r--r--weed/filer2/filerstore.go11
-rw-r--r--weed/filer2/leveldb/leveldb_store.go13
-rw-r--r--weed/filer2/memdb/memdb_store.go13
-rw-r--r--weed/filer2/redis/universal_redis_store.go15
7 files changed, 57 insertions, 51 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index 5f2990475..95ce9cb9f 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -1,6 +1,7 @@
package abstract_sql
import (
+ "context"
"database/sql"
"fmt"
@@ -18,7 +19,7 @@ type AbstractSqlStore struct {
SqlListInclusive string
}
-func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
@@ -38,7 +39,7 @@ func (store *AbstractSqlStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
@@ -58,7 +59,7 @@ func (store *AbstractSqlStore) UpdateEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entry, error) {
+func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
dir, name := fullpath.DirAndName()
row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir)
@@ -77,7 +78,7 @@ func (store *AbstractSqlStore) FindEntry(fullpath filer2.FullPath) (*filer2.Entr
return entry, nil
}
-func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error {
+func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
dir, name := fullpath.DirAndName()
@@ -94,7 +95,7 @@ func (store *AbstractSqlStore) DeleteEntry(fullpath filer2.FullPath) error {
return nil
}
-func (store *AbstractSqlStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive
if inclusive {
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index 2c1f03182..e14a9e023 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -1,6 +1,7 @@
package cassandra
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -39,7 +40,7 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er
return
}
-func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
@@ -56,12 +57,12 @@ func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(entry)
+ return store.InsertEntry(ctx, entry)
}
-func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
dir, name := fullpath.DirAndName()
var data []byte
@@ -88,7 +89,7 @@ func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.
return entry, nil
}
-func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
+func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
dir, name := fullpath.DirAndName()
@@ -101,7 +102,7 @@ func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) error {
return nil
}
-func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 50df3fc0b..4220e24d3 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -57,7 +57,7 @@ func (fs *Filer) KeepConnectedToMaster() {
fs.MasterClient.KeepConnectedToMaster()
}
-func (f *Filer) CreateEntry(entry *Entry) error {
+func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
if string(entry.FullPath) == "/" {
return nil
@@ -79,7 +79,7 @@ func (f *Filer) CreateEntry(entry *Entry) error {
// not found, check the store directly
if dirEntry == nil {
glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ = f.FindEntry(FullPath(dirPath))
+ dirEntry, _ = f.FindEntry(ctx, FullPath(dirPath))
} else {
glog.V(4).Infof("found cached directory: %s", dirPath)
}
@@ -102,9 +102,9 @@ func (f *Filer) CreateEntry(entry *Entry) error {
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.store.InsertEntry(dirEntry)
+ mkdirErr := f.store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
- if _, err := f.FindEntry(FullPath(dirPath)); err == ErrNotFound {
+ if _, err := f.FindEntry(ctx, FullPath(dirPath)); err == ErrNotFound {
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
@@ -137,14 +137,14 @@ func (f *Filer) CreateEntry(entry *Entry) error {
}
*/
- oldEntry, _ := f.FindEntry(entry.FullPath)
+ oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
if oldEntry == nil {
- if err := f.store.InsertEntry(entry); err != nil {
+ if err := f.store.InsertEntry(ctx, entry); err != nil {
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
} else {
- if err := f.UpdateEntry(oldEntry, entry); err != nil {
+ if err := f.UpdateEntry(ctx, oldEntry, entry); err != nil {
return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
}
}
@@ -156,7 +156,7 @@ func (f *Filer) CreateEntry(entry *Entry) error {
return nil
}
-func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) {
+func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
if oldEntry.IsDirectory() && !entry.IsDirectory() {
return fmt.Errorf("existing %s is a directory", entry.FullPath)
@@ -165,10 +165,10 @@ func (f *Filer) UpdateEntry(oldEntry, entry *Entry) (err error) {
return fmt.Errorf("existing %s is a file", entry.FullPath)
}
}
- return f.store.UpdateEntry(entry)
+ return f.store.UpdateEntry(ctx, entry)
}
-func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) {
+func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err error) {
now := time.Now()
@@ -184,11 +184,11 @@ func (f *Filer) FindEntry(p FullPath) (entry *Entry, err error) {
},
}, nil
}
- return f.store.FindEntry(p)
+ return f.store.FindEntry(ctx, p)
}
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, shouldDeleteChunks bool) (err error) {
- entry, err := f.FindEntry(p)
+ entry, err := f.FindEntry(ctx, p)
if err != nil {
return err
}
@@ -201,7 +201,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs
lastFileName := ""
includeLastFile := false
for limit > 0 {
- entries, err := f.ListDirectoryEntries(p, lastFileName, includeLastFile, 1024)
+ entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, 1024)
if err != nil {
return fmt.Errorf("list folder %s: %v", p, err)
}
@@ -241,14 +241,14 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecurs
f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
- return f.store.DeleteEntry(p)
+ return f.store.DeleteEntry(ctx, p)
}
-func (f *Filer) ListDirectoryEntries(p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- return f.store.ListDirectoryEntries(p, startFileName, inclusive, limit)
+ return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
}
func (f *Filer) cacheDelDirectory(dirpath string) {
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index 9ef1d9d48..c10074eb2 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -1,6 +1,7 @@
package filer2
import (
+ "context"
"errors"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -10,12 +11,12 @@ type FilerStore interface {
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration) error
- InsertEntry(*Entry) error
- UpdateEntry(*Entry) (err error)
+ InsertEntry(context.Context, *Entry) error
+ UpdateEntry(context.Context, *Entry) (err error)
// err == filer2.ErrNotFound if not found
- FindEntry(FullPath) (entry *Entry, err error)
- DeleteEntry(FullPath) (err error)
- ListDirectoryEntries(dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+ FindEntry(context.Context, FullPath) (entry *Entry, err error)
+ DeleteEntry(context.Context, FullPath) (err error)
+ ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
}
var ErrNotFound = errors.New("filer: no entry is found in filer store")
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index 179107e2c..60de11565 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -2,6 +2,7 @@ package leveldb
import (
"bytes"
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
@@ -45,7 +46,7 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
return
}
-func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
key := genKey(entry.DirAndName())
value, err := entry.EncodeAttributesAndChunks()
@@ -64,12 +65,12 @@ func (store *LevelDBStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *LevelDBStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(entry)
+ return store.InsertEntry(ctx, entry)
}
-func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
key := genKey(fullpath.DirAndName())
data, err := store.db.Get(key, nil)
@@ -94,7 +95,7 @@ func (store *LevelDBStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.En
return entry, nil
}
-func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
+func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
key := genKey(fullpath.DirAndName())
err = store.db.Delete(key, nil)
@@ -105,7 +106,7 @@ func (store *LevelDBStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
return nil
}
-func (store *LevelDBStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go
index 062f1cd1c..d4c906f2d 100644
--- a/weed/filer2/memdb/memdb_store.go
+++ b/weed/filer2/memdb/memdb_store.go
@@ -1,6 +1,7 @@
package memdb
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -33,21 +34,21 @@ func (store *MemDbStore) Initialize(configuration util.Configuration) (err error
return nil
}
-func (store *MemDbStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
// println("inserting", entry.FullPath)
store.tree.ReplaceOrInsert(entryItem{entry})
return nil
}
-func (store *MemDbStore) UpdateEntry(entry *filer2.Entry) (err error) {
- if _, err = store.FindEntry(entry.FullPath); err != nil {
+func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
+ if _, err = store.FindEntry(ctx, entry.FullPath); err != nil {
return fmt.Errorf("no such file %s : %v", entry.FullPath, err)
}
store.tree.ReplaceOrInsert(entryItem{entry})
return nil
}
-func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}})
if item == nil {
return nil, filer2.ErrNotFound
@@ -56,12 +57,12 @@ func (store *MemDbStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entr
return entry, nil
}
-func (store *MemDbStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
+func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
return nil
}
-func (store *MemDbStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
+func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
startFrom := string(fullpath)
if startFileName != "" {
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index 7fd7e1180..ec78f70e7 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -1,6 +1,7 @@
package redis
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -18,7 +19,7 @@ type UniversalRedisStore struct {
Client redis.UniversalClient
}
-func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
+func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
@@ -42,12 +43,12 @@ func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
return nil
}
-func (store *UniversalRedisStore) UpdateEntry(entry *filer2.Entry) (err error) {
+func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- return store.InsertEntry(entry)
+ return store.InsertEntry(ctx, entry)
}
-func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result()
if err == redis.Nil {
@@ -69,7 +70,7 @@ func (store *UniversalRedisStore) FindEntry(fullpath filer2.FullPath) (entry *fi
return entry, nil
}
-func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err error) {
+func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result()
@@ -88,7 +89,7 @@ func (store *UniversalRedisStore) DeleteEntry(fullpath filer2.FullPath) (err err
return nil
}
-func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
@@ -126,7 +127,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(fullpath filer2.FullPath,
// fetch entry meta
for _, fileName := range members {
path := filer2.NewFullPath(string(fullpath), fileName)
- entry, err := store.FindEntry(path)
+ entry, err := store.FindEntry(ctx, path)
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
} else {