diff options
Diffstat (limited to 'weed/filer/redis')
| -rw-r--r-- | weed/filer/redis/redis_cluster_store.go | 6 | ||||
| -rw-r--r-- | weed/filer/redis/redis_store.go | 2 | ||||
| -rw-r--r-- | weed/filer/redis/universal_redis_store.go | 49 | ||||
| -rw-r--r-- | weed/filer/redis/universal_redis_store_kv.go | 8 |
4 files changed, 35 insertions, 30 deletions
diff --git a/weed/filer/redis/redis_cluster_store.go b/weed/filer/redis/redis_cluster_store.go index 8af94ee55..9572058a8 100644 --- a/weed/filer/redis/redis_cluster_store.go +++ b/weed/filer/redis/redis_cluster_store.go @@ -3,7 +3,7 @@ package redis import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v8" ) func init() { @@ -20,8 +20,8 @@ func (store *RedisClusterStore) GetName() string { func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) { - configuration.SetDefault(prefix+"useReadOnly", true) - configuration.SetDefault(prefix+"routeByLatency", true) + configuration.SetDefault(prefix+"useReadOnly", false) + configuration.SetDefault(prefix+"routeByLatency", false) return store.initialize( configuration.GetStringSlice(prefix+"addresses"), diff --git a/weed/filer/redis/redis_store.go b/weed/filer/redis/redis_store.go index e152457ed..665352a63 100644 --- a/weed/filer/redis/redis_store.go +++ b/weed/filer/redis/redis_store.go @@ -3,7 +3,7 @@ package redis import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v8" ) func init() { diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index 0de9924a3..30d11a7f4 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v8" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -44,7 +44,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. value = util.MaybeGzipData(value) } - _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result() + _, err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result() if err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) @@ -52,7 +52,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. dir, name := entry.FullPath.DirAndName() if name != "" { - _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result() + _, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result() if err != nil { return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) } @@ -68,7 +68,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer. func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { - data, err := store.Client.Get(string(fullpath)).Result() + data, err := store.Client.Get(ctx, string(fullpath)).Result() if err == redis.Nil { return nil, filer_pb.ErrNotFound } @@ -90,7 +90,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { - _, err = store.Client.Del(string(fullpath)).Result() + _, err = store.Client.Del(ctx, string(fullpath)).Result() if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) @@ -98,7 +98,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util dir, name := fullpath.DirAndName() if name != "" { - _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result() + _, err = store.Client.SRem(ctx, genDirectoryListKey(dir), name).Result() if err != nil { return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) } @@ -109,14 +109,14 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { - members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result() + members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result() if err != nil { return fmt.Errorf("delete folder %s : %v", fullpath, err) } for _, fileName := range members { path := util.NewFullPath(string(fullpath), fileName) - _, err = store.Client.Del(string(path)).Result() + _, err = store.Client.Del(ctx, string(path)).Result() if err != nil { return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) } @@ -125,17 +125,16 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full return nil } -func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { - return nil, filer.ErrUnsupportedListDirectoryPrefixed +func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } -func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, - limit int) (entries []*filer.Entry, err error) { +func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - dirListKey := genDirectoryListKey(string(fullpath)) - members, err := store.Client.SMembers(dirListKey).Result() + dirListKey := genDirectoryListKey(string(dirPath)) + members, err := store.Client.SMembers(ctx, dirListKey).Result() if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } // skip @@ -144,7 +143,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full for _, m := range members { if strings.Compare(m, startFileName) >= 0 { if m == startFileName { - if inclusive { + if includeStartFile { t = append(t, m) } } else { @@ -161,29 +160,35 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full }) // limit - if limit < len(members) { + if limit < int64(len(members)) { members = members[:limit] } // fetch entry meta for _, fileName := range members { - path := util.NewFullPath(string(fullpath), fileName) + path := util.NewFullPath(string(dirPath), fileName) entry, err := store.FindEntry(ctx, path) + lastFileName = fileName if err != nil { glog.V(0).Infof("list %s : %v", path, err) + if err == filer_pb.ErrNotFound { + continue + } } else { if entry.TtlSec > 0 { if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - store.Client.Del(string(path)).Result() - store.Client.SRem(dirListKey, fileName).Result() + store.Client.Del(ctx, string(path)).Result() + store.Client.SRem(ctx, dirListKey, fileName).Result() continue } } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } } - return entries, err + return lastFileName, err } func genDirectoryListKey(dir string) (dirList string) { diff --git a/weed/filer/redis/universal_redis_store_kv.go b/weed/filer/redis/universal_redis_store_kv.go index 0fc12c631..ad6e389ed 100644 --- a/weed/filer/redis/universal_redis_store_kv.go +++ b/weed/filer/redis/universal_redis_store_kv.go @@ -5,12 +5,12 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v8" ) func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - _, err = store.Client.Set(string(key), value, 0).Result() + _, err = store.Client.Set(ctx, string(key), value, 0).Result() if err != nil { return fmt.Errorf("kv put: %v", err) @@ -21,7 +21,7 @@ func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value [ func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - data, err := store.Client.Get(string(key)).Result() + data, err := store.Client.Get(ctx, string(key)).Result() if err == redis.Nil { return nil, filer.ErrKvNotFound @@ -32,7 +32,7 @@ func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value func (store *UniversalRedisStore) KvDelete(ctx context.Context, key []byte) (err error) { - _, err = store.Client.Del(string(key)).Result() + _, err = store.Client.Del(ctx, string(key)).Result() if err != nil { return fmt.Errorf("kv delete: %v", err) |
