aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-12-22 02:26:05 -0800
committerChris Lu <chris.lu@gmail.com>2020-12-22 02:26:05 -0800
commit90d785a15fb8c0379e068aab94066cf5e3622071 (patch)
treeb834d04131c69808c3d58dbac0e34498ce247bc6
parentfe46411cd4e675d910fdac1a5d91964376518c1c (diff)
downloadseaweedfs-90d785a15fb8c0379e068aab94066cf5e3622071.tar.xz
seaweedfs-90d785a15fb8c0379e068aab94066cf5e3622071.zip
filer: redis, redis cluster, cassandra support super large directory
-rw-r--r--weed/command/scaffold.go6
-rw-r--r--weed/filer/cassandra/cassandra_store.go44
-rw-r--r--weed/filer/filerstore.go8
-rw-r--r--weed/filer/redis2/redis_cluster_store.go4
-rw-r--r--weed/filer/redis2/redis_store.go4
-rw-r--r--weed/filer/redis2/universal_redis_store.go32
6 files changed, 89 insertions, 9 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 1ab763004..04a988027 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -138,12 +138,16 @@ hosts=[
]
username=""
password=""
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[redis2]
enabled = false
address = "localhost:6379"
password = ""
database = 0
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[redis_cluster2]
enabled = false
@@ -160,6 +164,8 @@ password = ""
readOnly = true
# automatically use the closest Redis server for reads
routeByLatency = true
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[etcd]
enabled = false
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index ae8cb7a86..49f5625d9 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -16,8 +16,9 @@ func init() {
}
type CassandraStore struct {
- cluster *gocql.ClusterConfig
- session *gocql.Session
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+ superLargeDirectoryHash map[string]string
}
func (store *CassandraStore) GetName() string {
@@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) {
+func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
+ dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) {
store.cluster = gocql.NewCluster(hosts...)
if username != "" && password != "" {
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
@@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam
if err != nil {
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
}
+
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]string)
+ existingHash := make(map[string]string)
+ for _, dir := range superLargeDirectories {
+ // adding dir hash to avoid duplicated names
+ dirHash := util.Md5String([]byte(dir))[:4]
+ store.superLargeDirectoryHash[dir] = dirHash
+ if existingDir, found := existingHash[dirHash]; found {
+ glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
+ }
+ existingHash[dirHash] = dir
+ }
return
}
@@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
@@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
var data []byte
if err := store.session.Query(
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
@@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=? AND name=?",
@@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full
}
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+ if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=?",
@@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
+ if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
+
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
if inclusive {
cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index b5a03c1d1..f1e6c6c35 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -7,10 +7,10 @@ import (
)
var (
- ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
- ErrKvNotImplemented = errors.New("kv not implemented yet")
- ErrKvNotFound = errors.New("kv: not found")
-
+ ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
+ ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
+ ErrKvNotImplemented = errors.New("kv not implemented yet")
+ ErrKvNotFound = errors.New("kv: not found")
)
type FilerStore interface {
diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go
index d155dbe88..c7742bb19 100644
--- a/weed/filer/redis2/redis_cluster_store.go
+++ b/weed/filer/redis2/redis_cluster_store.go
@@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr
configuration.GetString(prefix+"password"),
configuration.GetBool(prefix+"useReadOnly"),
configuration.GetBool(prefix+"routeByLatency"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addresses,
Password: password,
ReadOnly: readOnly,
RouteByLatency: routeByLatency,
})
+ store.loadSuperLargeDirectories(superLargeDirectories)
return
}
diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go
index ed04c817b..da404ed4c 100644
--- a/weed/filer/redis2/redis_store.go
+++ b/weed/filer/redis2/redis_store.go
@@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
+func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Password: password,
DB: database,
})
+ store.loadSuperLargeDirectories(superLargeDirectories)
return
}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 0374314c0..c6b566305 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -19,6 +19,27 @@ const (
type UniversalRedis2Store struct {
Client redis.UniversalClient
+ superLargeDirectoryHash map[string]string
+}
+
+func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
+ dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) {
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]string)
+ existingHash := make(map[string]string)
+ for _, dir := range superLargeDirectories {
+ // adding dir hash to avoid duplicated names
+ dirHash := util.Md5String([]byte(dir))[:4]
+ store.superLargeDirectoryHash[dir] = dirHash
+ if existingDir, found := existingHash[dirHash]; found {
+ glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
+ }
+ existingHash[dirHash] = dir
+ }
}
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
@@ -47,6 +68,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
dir, name := entry.FullPath.DirAndName()
+ if _, found := store.isSuperLargeDirectory(dir); found {
+ return nil
+ }
+
if name != "" {
if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
@@ -96,6 +121,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
}
dir, name := fullpath.DirAndName()
+ if _, found := store.isSuperLargeDirectory(dir); found {
+ return nil
+ }
if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
if err != nil {
@@ -108,6 +136,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+ if _, found := store.isSuperLargeDirectory(string(fullpath)); found {
+ return nil
+ }
+
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)