aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-02-26 17:29:22 -0800
committerGitHub <noreply@github.com>2025-02-26 17:29:22 -0800
commitdf436d342b68810a2669acf2aee2fa64fdc89cec (patch)
treef0da7618d8f78a45f5f49e88d6761f97de497fdb
parent65b0a7cf83e78b2c24ca56377f8aac78d0fd3496 (diff)
downloadseaweedfs-df436d342b68810a2669acf2aee2fa64fdc89cec.tar.xz
seaweedfs-df436d342b68810a2669acf2aee2fa64fdc89cec.zip
add cassandra2 (#6582)
-rw-r--r--weed/command/scaffold/filer.toml5
-rw-r--r--weed/filer/cassandra2/README.txt15
-rw-r--r--weed/filer/cassandra2/cassandra_store.go221
-rw-r--r--weed/filer/cassandra2/cassandra_store_kv.go63
4 files changed, 302 insertions, 2 deletions
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 728aecb53..596a7b31d 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -139,12 +139,13 @@ connection_max_lifetime_seconds = 0
enableUpsert = true
upsertQuery = """UPSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"""
-[cassandra]
+[cassandra2]
# CREATE TABLE filemeta (
+# dirhash bigint,
# directory varchar,
# name varchar,
# meta blob,
-# PRIMARY KEY (directory, name)
+# PRIMARY KEY ((dirhash, directory), name)
# ) WITH CLUSTERING ORDER BY (name ASC);
enabled = false
keyspace = "seaweedfs"
diff --git a/weed/filer/cassandra2/README.txt b/weed/filer/cassandra2/README.txt
new file mode 100644
index 000000000..c8cf921de
--- /dev/null
+++ b/weed/filer/cassandra2/README.txt
@@ -0,0 +1,15 @@
+1. create a keyspace
+
+CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
+
+2. create filemeta table
+
+ USE seaweedfs;
+
+ CREATE TABLE filemeta (
+ dirhash bigint,
+ directory varchar,
+ name varchar,
+ meta blob,
+ PRIMARY KEY ((dirhash, directory), name)
+ ) WITH CLUSTERING ORDER BY (name ASC);
diff --git a/weed/filer/cassandra2/cassandra_store.go b/weed/filer/cassandra2/cassandra_store.go
new file mode 100644
index 000000000..d0578669b
--- /dev/null
+++ b/weed/filer/cassandra2/cassandra_store.go
@@ -0,0 +1,221 @@
+package cassandra2
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/gocql/gocql"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &Cassandra2Store{})
+}
+
+type Cassandra2Store struct {
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+ superLargeDirectoryHash map[string]string
+}
+
+func (store *Cassandra2Store) GetName() string {
+ return "cassandra2"
+}
+
+func (store *Cassandra2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"keyspace"),
+ configuration.GetStringSlice(prefix+"hosts"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
+ configuration.GetString(prefix+"localDC"),
+ configuration.GetInt(prefix+"connection_timeout_millisecond"),
+ )
+}
+
+func (store *Cassandra2Store) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
+ dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *Cassandra2Store) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string, timeout int) (err error) {
+ store.cluster = gocql.NewCluster(hosts...)
+ if username != "" && password != "" {
+ store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
+ }
+ store.cluster.Keyspace = keyspace
+ store.cluster.Timeout = time.Duration(timeout) * time.Millisecond
+ glog.V(0).Infof("timeout = %d", timeout)
+ fallback := gocql.RoundRobinHostPolicy()
+ if localDC != "" {
+ fallback = gocql.DCAwareRoundRobinPolicy(localDC)
+ }
+ store.cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallback)
+ store.cluster.Consistency = gocql.LocalQuorum
+
+ store.session, err = store.cluster.CreateSession()
+ if err != nil {
+ glog.V(0).Infof("Failed to open cassandra2 store, hosts %v, keyspace %s", hosts, keyspace)
+ }
+
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]string)
+ existingHash := make(map[string]string)
+ for _, dir := range superLargeDirectories {
+ // adding dir hash to avoid duplicated names
+ dirHash := util.Md5String([]byte(dir))[:4]
+ store.superLargeDirectoryHash[dir] = dirHash
+ if existingDir, found := existingHash[dirHash]; found {
+ glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
+ }
+ existingHash[dirHash] = dir
+ }
+ return
+}
+
+func (store *Cassandra2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *Cassandra2Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *Cassandra2Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *Cassandra2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
+ meta = util.MaybeGzipData(meta)
+ }
+
+ if err := store.session.Query(
+ "INSERT INTO filemeta (dirhash,directory,name,meta) VALUES(?,?,?,?) USING TTL ? ",
+ util.HashStringToLong(dir), dir, name, meta, entry.TtlSec).Exec(); err != nil {
+ return fmt.Errorf("insert %s: %s", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *Cassandra2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *Cassandra2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
+ var data []byte
+ if err := store.session.Query(
+ "SELECT meta FROM filemeta WHERE dirhash=? AND directory=? AND name=?",
+ util.HashStringToLong(dir), dir, name).Scan(&data); err != nil {
+ if errors.Is(err, gocql.ErrNotFound) {
+ return nil, filer_pb.ErrNotFound
+ }
+ return nil, err
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *Cassandra2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
+
+ dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE dirhash=? AND directory=? AND name=?",
+ util.HashStringToLong(dir), dir, name).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *Cassandra2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+ if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE dirhash=? AND directory=?",
+ util.HashStringToLong(string(fullpath)), fullpath).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *Cassandra2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {
+ return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
+
+ cqlStr := "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
+ if includeStartFile {
+ cqlStr = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
+ }
+
+ var data []byte
+ var name string
+ iter := store.session.Query(cqlStr, util.HashStringToLong(string(dirPath)), string(dirPath), startFileName, limit+1).Iter()
+ for iter.Scan(&name, &data) {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(dirPath), name),
+ }
+ lastFileName = name
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ if err = iter.Close(); err != nil {
+ glog.V(0).Infof("list iterator close: %v", err)
+ }
+
+ return lastFileName, err
+}
+
+func (store *Cassandra2Store) Shutdown() {
+ store.session.Close()
+}
diff --git a/weed/filer/cassandra2/cassandra_store_kv.go b/weed/filer/cassandra2/cassandra_store_kv.go
new file mode 100644
index 000000000..3b8c3d51a
--- /dev/null
+++ b/weed/filer/cassandra2/cassandra_store_kv.go
@@ -0,0 +1,63 @@
+package cassandra2
+
+import (
+ "context"
+ "encoding/base64"
+ "fmt"
+ "github.com/gocql/gocql"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+func (store *Cassandra2Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ dir, name := genDirAndName(key)
+
+ if err := store.session.Query(
+ "INSERT INTO filemeta (dirhash,directory,name,meta) VALUES(?,?,?,?) USING TTL ? ",
+ util.HashStringToLong(dir), dir, name, value, 0).Exec(); err != nil {
+ return fmt.Errorf("kv insert: %s", err)
+ }
+
+ return nil
+}
+
+func (store *Cassandra2Store) KvGet(ctx context.Context, key []byte) (data []byte, err error) {
+ dir, name := genDirAndName(key)
+
+ if err := store.session.Query(
+ "SELECT meta FROM filemeta WHERE dirhash=? AND directory=? AND name=?",
+ util.HashStringToLong(dir), dir, name).Scan(&data); err != nil {
+ if err != gocql.ErrNotFound {
+ return nil, filer.ErrKvNotFound
+ }
+ }
+
+ if len(data) == 0 {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return data, nil
+}
+
+func (store *Cassandra2Store) KvDelete(ctx context.Context, key []byte) (err error) {
+ dir, name := genDirAndName(key)
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE dirhash=? AND directory=? AND name=?",
+ util.HashStringToLong(dir), dir, name).Exec(); err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
+
+func genDirAndName(key []byte) (dir string, name string) {
+ for len(key) < 8 {
+ key = append(key, 0)
+ }
+
+ dir = base64.StdEncoding.EncodeToString(key[:8])
+ name = base64.StdEncoding.EncodeToString(key[8:])
+
+ return
+}