aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2015-01-05 23:03:27 -0800
committerChris Lu <chris.lu@gmail.com>2015-01-05 23:03:27 -0800
commit49784d7f288d05590ddde5903ef71fe1754da83e (patch)
tree342bda192ff57b4eeaf593f4fa7ec06c9b37f998 /go
parent165734ce11658f34cb2137e7343516b066c3a1f7 (diff)
downloadseaweedfs-49784d7f288d05590ddde5903ef71fe1754da83e.tar.xz
seaweedfs-49784d7f288d05590ddde5903ef71fe1754da83e.zip
Add support for distributed filer metadata store.
Diffstat (limited to 'go')
-rw-r--r--go/filer/cassandra_store/cassandra_store.go87
-rw-r--r--go/filer/cassandra_store/schema.cql22
-rw-r--r--go/filer/embedded_filer/design.txt (renamed from go/filer/design.txt)0
-rw-r--r--go/filer/flat_namespace/flat_namespace_filer.go50
-rw-r--r--go/filer/flat_namespace/flat_namespace_store.go9
-rw-r--r--go/weed/filer.go13
-rw-r--r--go/weed/server.go3
-rw-r--r--go/weed/weed_server/filer_server.go22
-rw-r--r--go/weed/weed_server/filer_server_handlers.go2
9 files changed, 197 insertions, 11 deletions
diff --git a/go/filer/cassandra_store/cassandra_store.go b/go/filer/cassandra_store/cassandra_store.go
new file mode 100644
index 000000000..83face686
--- /dev/null
+++ b/go/filer/cassandra_store/cassandra_store.go
@@ -0,0 +1,87 @@
+package cassandra_store
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+
+ "github.com/gocql/gocql"
+)
+
+/*
+
+Basically you need a table just like this:
+
+CREATE TABLE seaweed_files (
+ path varchar,
+ fids list<varchar>,
+ PRIMARY KEY (path)
+);
+
+Need to match flat_namespace.FlatNamespaceStore interface
+ Put(fullFileName string, fid string) (err error)
+ Get(fullFileName string) (fid string, err error)
+ Delete(fullFileName string) (fid string, err error)
+
+*/
+type CassandraStore struct {
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+}
+
+func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) {
+ c = &CassandraStore{}
+ c.cluster = gocql.NewCluster(hosts...)
+ c.cluster.Keyspace = keyspace
+ c.cluster.Consistency = gocql.Quorum
+ c.session, err = c.cluster.CreateSession()
+ if err != nil {
+ glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
+ }
+ return
+}
+
+func (c *CassandraStore) Put(fullFileName string, fid string) (err error) {
+ var input []string
+ input = append(input, fid)
+ if err := c.session.Query(
+ `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`,
+ fullFileName, input).Exec(); err != nil {
+ glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err)
+ return err
+ }
+ return nil
+}
+func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
+ var output []string
+ if err := c.session.Query(
+ `select fids FROM seaweed_files WHERE path = ? LIMIT 1`,
+ fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
+ if err != gocql.ErrNotFound {
+ glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
+ }
+ }
+ if len(output) == 0 {
+ return "", fmt.Errorf("No file id found for %s", fullFileName)
+ }
+ return output[0], nil
+}
+
+// Currently the fid is not returned
+func (c *CassandraStore) Delete(fullFileName string) (fid string, err error) {
+ if err := c.session.Query(
+ `DELETE FROM seaweed_files WHERE path = ?`,
+ fullFileName).Exec(); err != nil {
+ if err != gocql.ErrNotFound {
+ glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err)
+ }
+ return "", err
+ }
+ return "", nil
+}
+
+func (c *CassandraStore) Close() {
+ if c.session != nil {
+ c.session.Close()
+ }
+}
diff --git a/go/filer/cassandra_store/schema.cql b/go/filer/cassandra_store/schema.cql
new file mode 100644
index 000000000..d6f2bb093
--- /dev/null
+++ b/go/filer/cassandra_store/schema.cql
@@ -0,0 +1,22 @@
+/*
+
+Here is the CQL to create the table.CassandraStore
+
+Optionally you can adjust the keyspace name and replication settings.
+
+For production server, very likely you want to set replication_factor to 3
+
+*/
+
+create keyspace seaweed WITH replication = {
+ 'class':'SimpleStrategy',
+ 'replication_factor':1
+};
+
+use seaweed;
+
+CREATE TABLE seaweed_files (
+ path varchar,
+ fids list<varchar>,
+ PRIMARY KEY (path)
+);
diff --git a/go/filer/design.txt b/go/filer/embedded_filer/design.txt
index 45fec8fbe..45fec8fbe 100644
--- a/go/filer/design.txt
+++ b/go/filer/embedded_filer/design.txt
diff --git a/go/filer/flat_namespace/flat_namespace_filer.go b/go/filer/flat_namespace/flat_namespace_filer.go
new file mode 100644
index 000000000..021b809d6
--- /dev/null
+++ b/go/filer/flat_namespace/flat_namespace_filer.go
@@ -0,0 +1,50 @@
+package flat_namespace
+
+import (
+ "errors"
+
+ "github.com/chrislusf/weed-fs/go/filer"
+)
+
+type FlatNamesapceFiler struct {
+ master string
+ store FlatNamespaceStore
+}
+
+var (
+ NotImplemented = errors.New("Not Implemented for flat namespace meta data store!")
+)
+
+func NewFlatNamesapceFiler(master string, store FlatNamespaceStore) *FlatNamesapceFiler {
+ return &FlatNamesapceFiler{
+ master: master,
+ store: store,
+ }
+}
+
+func (filer *FlatNamesapceFiler) CreateFile(fullFileName string, fid string) (err error) {
+ return filer.store.Put(fullFileName, fid)
+}
+func (filer *FlatNamesapceFiler) FindFile(fullFileName string) (fid string, err error) {
+ return filer.store.Get(fullFileName)
+}
+func (filer *FlatNamesapceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
+ return 0, NotImplemented
+}
+func (filer *FlatNamesapceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
+ return nil, NotImplemented
+}
+func (filer *FlatNamesapceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
+ return nil, NotImplemented
+}
+func (filer *FlatNamesapceFiler) DeleteDirectory(dirPath string, recursive bool) (err error) {
+ return NotImplemented
+}
+
+func (filer *FlatNamesapceFiler) DeleteFile(fullFileName string) (fid string, err error) {
+ return filer.store.Delete(fullFileName)
+}
+
+func (filer *FlatNamesapceFiler) Move(fromPath string, toPath string) error {
+ return NotImplemented
+}
diff --git a/go/filer/flat_namespace/flat_namespace_store.go b/go/filer/flat_namespace/flat_namespace_store.go
new file mode 100644
index 000000000..832b70e40
--- /dev/null
+++ b/go/filer/flat_namespace/flat_namespace_store.go
@@ -0,0 +1,9 @@
+package flat_namespace
+
+import ()
+
+type FlatNamespaceStore interface {
+ Put(fullFileName string, fid string) (err error)
+ Get(fullFileName string) (fid string, err error)
+ Delete(fullFileName string) (fid string, err error)
+}
diff --git a/go/weed/filer.go b/go/weed/filer.go
index 11154f183..767864450 100644
--- a/go/weed/filer.go
+++ b/go/weed/filer.go
@@ -22,6 +22,8 @@ type FilerOptions struct {
defaultReplicaPlacement *string
dir *string
redirectOnRead *bool
+ cassandra_server *string
+ cassandra_keyspace *string
}
func init() {
@@ -32,13 +34,15 @@ func init() {
f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
+ f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
+ f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
}
var cmdFiler = &Command{
UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>",
Short: "start a file server that points to a master server",
Long: `start a file server which accepts REST operation for any files.
-
+
//create or overwrite the file, the directories /path/to will be automatically created
POST /path/to/file
//get the file content
@@ -47,10 +51,10 @@ var cmdFiler = &Command{
POST /path/to/
//return a json format subdirectory and files listing
GET /path/to/
-
+
Current <fullpath~fileid> mapping metadata store is local embedded leveldb.
It should be highly scalable to hundreds of millions of files on a modest machine.
-
+
Future we will ensure it can avoid of being SPOF.
`,
@@ -63,8 +67,9 @@ func runFiler(cmd *Command, args []string) bool {
}
r := http.NewServeMux()
- _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
+ _, nfs_err := weed_server.NewEmbeddedFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead,
+ *f.cassandra_server, *f.cassandra_keyspace,
)
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
diff --git a/go/weed/server.go b/go/weed/server.go
index 2db251944..3aae2e7fb 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -157,8 +157,9 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingFiler {
go func() {
r := http.NewServeMux()
- _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
+ _, nfs_err := weed_server.NewEmbeddedFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
+ "", "",
)
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go
index b8cb9bd5d..325c9b626 100644
--- a/go/weed/weed_server/filer_server.go
+++ b/go/weed/weed_server/filer_server.go
@@ -5,7 +5,9 @@ import (
"strconv"
"github.com/chrislusf/weed-fs/go/filer"
+ "github.com/chrislusf/weed-fs/go/filer/cassandra_store"
"github.com/chrislusf/weed-fs/go/filer/embedded_filer"
+ "github.com/chrislusf/weed-fs/go/filer/flat_namespace"
"github.com/chrislusf/weed-fs/go/glog"
)
@@ -18,8 +20,9 @@ type FilerServer struct {
filer filer.Filer
}
-func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
+func NewEmbeddedFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
replication string, redirectOnRead bool,
+ cassandra_server string, cassandra_keyspace string,
) (fs *FilerServer, err error) {
fs = &FilerServer{
master: master,
@@ -29,12 +32,21 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
port: ":" + strconv.Itoa(port),
}
- if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
- glog.Fatal("Can not start filer in dir", dir, ": ", err.Error())
- return
+ if cassandra_server == "" {
+ if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
+ glog.Fatalf("Can not start filer in dir %s : %v", err)
+ return
+ }
+
+ r.HandleFunc("/admin/mv", fs.moveHandler)
+ } else {
+ cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
+ if err != nil {
+ glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
+ }
+ fs.filer = flat_namespace.NewFlatNamesapceFiler(master, cassandra_store)
}
- r.HandleFunc("/admin/mv", fs.moveHandler)
r.HandleFunc("/", fs.filerHandler)
return fs, nil
diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go
index 6f22912a7..1afc1f852 100644
--- a/go/weed/weed_server/filer_server_handlers.go
+++ b/go/weed/weed_server/filer_server_handlers.go
@@ -179,7 +179,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
- glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error())
+ glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, db_err)
return
}