aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/redis/universal_redis_store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/redis/universal_redis_store.go')
-rw-r--r--weed/filer/redis/universal_redis_store.go49
1 files changed, 27 insertions, 22 deletions
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index 0de9924a3..30d11a7f4 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -7,7 +7,7 @@ import (
"strings"
"time"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -44,7 +44,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
value = util.MaybeGzipData(value)
}
- _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
+ _, err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
@@ -52,7 +52,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
dir, name := entry.FullPath.DirAndName()
if name != "" {
- _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
@@ -68,7 +68,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
- data, err := store.Client.Get(string(fullpath)).Result()
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
@@ -90,7 +90,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F
func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
- _, err = store.Client.Del(string(fullpath)).Result()
+ _, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
@@ -98,7 +98,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
dir, name := fullpath.DirAndName()
if name != "" {
- _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.SRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@@ -109,14 +109,14 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete folder %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
+ _, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@@ -125,17 +125,16 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
return nil
}
-func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- dirListKey := genDirectoryListKey(string(fullpath))
- members, err := store.Client.SMembers(dirListKey).Result()
+ dirListKey := genDirectoryListKey(string(dirPath))
+ members, err := store.Client.SMembers(ctx, dirListKey).Result()
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// skip
@@ -144,7 +143,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
for _, m := range members {
if strings.Compare(m, startFileName) >= 0 {
if m == startFileName {
- if inclusive {
+ if includeStartFile {
t = append(t, m)
}
} else {
@@ -161,29 +160,35 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
})
// limit
- if limit < len(members) {
+ if limit < int64(len(members)) {
members = members[:limit]
}
// fetch entry meta
for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ continue
+ }
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- store.Client.Del(string(path)).Result()
- store.Client.SRem(dirListKey, fileName).Result()
+ store.Client.Del(ctx, string(path)).Result()
+ store.Client.SRem(ctx, dirListKey, fileName).Result()
continue
}
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
- return entries, err
+ return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {