aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/cassandra/cassandra_store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/cassandra/cassandra_store.go')
-rw-r--r--weed/filer/cassandra/cassandra_store.go163
1 files changed, 163 insertions, 0 deletions
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
new file mode 100644
index 000000000..fd161b1f1
--- /dev/null
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -0,0 +1,163 @@
+package cassandra
+
+import (
+ "context"
+ "fmt"
+ "github.com/gocql/gocql"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &CassandraStore{})
+}
+
+type CassandraStore struct {
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+}
+
+func (store *CassandraStore) GetName() string {
+ return "cassandra"
+}
+
+func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"keyspace"),
+ configuration.GetStringSlice(prefix+"hosts"),
+ )
+}
+
+func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
+ store.cluster = gocql.NewCluster(hosts...)
+ store.cluster.Keyspace = keyspace
+ store.cluster.Consistency = gocql.LocalQuorum
+ store.session, err = store.cluster.CreateSession()
+ if err != nil {
+ glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
+ }
+ return
+}
+
+func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if err := store.session.Query(
+ "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
+ dir, name, meta, entry.TtlSec).Exec(); err != nil {
+ return fmt.Errorf("insert %s: %s", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ dir, name := fullpath.DirAndName()
+ var data []byte
+ if err := store.session.Query(
+ "SELECT meta FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Consistency(gocql.One).Scan(&data); err != nil {
+ if err != gocql.ErrNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ }
+
+ if len(data) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
+
+ dir, name := fullpath.DirAndName()
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=?",
+ fullpath).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer.Entry, err error) {
+
+ cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
+ if inclusive {
+ cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
+ }
+
+ var data []byte
+ var name string
+ iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
+ for iter.Scan(&name, &data) {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(fullpath), name),
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+ if err := iter.Close(); err != nil {
+ glog.V(0).Infof("list iterator close: %v", err)
+ }
+
+ return entries, err
+}
+
+func (store *CassandraStore) Shutdown() {
+ store.session.Close()
+}