aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer2/cassandra/README.txt12
-rw-r--r--weed/filer2/cassandra/cassandra_store.go136
-rw-r--r--weed/filer2/configuration.go13
-rw-r--r--weed/server/filer_server.go1
4 files changed, 162 insertions, 0 deletions
diff --git a/weed/filer2/cassandra/README.txt b/weed/filer2/cassandra/README.txt
new file mode 100644
index 000000000..2d176229f
--- /dev/null
+++ b/weed/filer2/cassandra/README.txt
@@ -0,0 +1,12 @@
+1. create a keyspace
+
+CREATE KEYSPACE seaweedfs WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
+
+2. create filemeta table
+
+ CREATE TABLE filemeta (
+ directory varchar,
+ name varchar,
+ meta blob,
+ PRIMARY KEY (directory, name)
+ ) WITH CLUSTERING ORDER BY (name ASC);
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
new file mode 100644
index 000000000..316b6cce0
--- /dev/null
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -0,0 +1,136 @@
+package cassandra
+
+import (
+ "fmt"
+ "github.com/gocql/gocql"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/spf13/viper"
+)
+
+func init() {
+ filer2.Stores = append(filer2.Stores, &CassandraStore{})
+}
+
+type CassandraStore struct {
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+}
+
+func (store *CassandraStore) GetName() string {
+ return "cassandra"
+}
+
+func (store *CassandraStore) Initialize(viper *viper.Viper) (err error) {
+ return store.initialize(
+ viper.GetString("keyspace"),
+ viper.GetStringSlice("hosts"),
+ )
+}
+
+func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
+ store.cluster = gocql.NewCluster(hosts...)
+ store.cluster.Keyspace = keyspace
+ store.cluster.Consistency = gocql.LocalQuorum
+ store.session, err = store.cluster.CreateSession()
+ if err != nil {
+ glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
+ }
+ return
+}
+
+func (store *CassandraStore) InsertEntry(entry *filer2.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if err := store.session.Query(
+ "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?)",
+ dir, name, meta).Exec(); err != nil {
+ return fmt.Errorf("insert %s: %s", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *CassandraStore) UpdateEntry(entry *filer2.Entry) (err error) {
+
+ return store.InsertEntry(entry)
+}
+
+func (store *CassandraStore) FindEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+
+ dir, name := fullpath.DirAndName()
+ var data []byte
+ if err := store.session.Query(
+ "SELECT meta FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Consistency(gocql.One).Scan(&data); err != nil {
+ if err != gocql.ErrNotFound {
+ return nil, fmt.Errorf("read entry %s: %v", fullpath, err)
+ }
+ }
+
+ if len(data) == 0 {
+ return nil, fmt.Errorf("not found: %s", fullpath)
+ }
+
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *CassandraStore) DeleteEntry(fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
+
+ entry, err = store.FindEntry(fullpath)
+ if err != nil {
+ return nil, nil
+ }
+
+ dir, name := fullpath.DirAndName()
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Exec(); err != nil {
+ return entry, fmt.Errorf("delete %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *CassandraStore) ListDirectoryEntries(fullpath filer2.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer2.Entry, err error) {
+
+ cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
+ if inclusive {
+ cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
+ }
+
+ var data []byte
+ var name string
+ iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
+ for iter.Scan(&name, &data) {
+ entry := &filer2.Entry{
+ FullPath: filer2.NewFullPath(string(fullpath), name),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+ if err := iter.Close(); err != nil {
+ glog.V(0).Infof("list iterator close: %v", err)
+ }
+
+ return entries, err
+}
diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go
index 91a0d6f46..0112a883b 100644
--- a/weed/filer2/configuration.go
+++ b/weed/filer2/configuration.go
@@ -61,6 +61,19 @@ sslmode = "disable"
connection_max_idle = 100
connection_max_open = 100
+[cassandra]
+# CREATE TABLE filemeta (
+# directory varchar,
+# name varchar,
+# meta blob,
+# PRIMARY KEY (directory, name)
+# ) WITH CLUSTERING ORDER BY (name ASC);
+enabled = false
+keyspace="seaweedfs"
+hosts=[
+ "localhost:9042",
+]
+
`
)
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 4517733ae..6c8a2c079 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"