diff options
Diffstat (limited to 'go/filer')
| -rw-r--r-- | go/filer/cassandra_store/cassandra_store.go | 87 | ||||
| -rw-r--r-- | go/filer/cassandra_store/schema.cql | 22 | ||||
| -rw-r--r-- | go/filer/client_operations.go | 6 | ||||
| -rw-r--r-- | go/filer/directory.go | 20 | ||||
| -rw-r--r-- | go/filer/embedded_filer/design.txt (renamed from go/filer/design.txt) | 0 | ||||
| -rw-r--r-- | go/filer/embedded_filer/directory.go | 15 | ||||
| -rw-r--r-- | go/filer/embedded_filer/directory_in_map.go (renamed from go/filer/directory_in_map.go) | 22 | ||||
| -rw-r--r-- | go/filer/embedded_filer/directory_test.go (renamed from go/filer/directory_test.go) | 2 | ||||
| -rw-r--r-- | go/filer/embedded_filer/filer_embedded.go (renamed from go/filer/filer_embedded.go) | 12 | ||||
| -rw-r--r-- | go/filer/embedded_filer/files_in_leveldb.go (renamed from go/filer/files_in_leveldb.go) | 16 | ||||
| -rw-r--r-- | go/filer/filer.go | 17 | ||||
| -rw-r--r-- | go/filer/flat_namespace/flat_namespace_filer.go | 50 | ||||
| -rw-r--r-- | go/filer/flat_namespace/flat_namespace_store.go | 9 | ||||
| -rw-r--r-- | go/filer/redis_store/redis_store.go | 48 |
14 files changed, 275 insertions, 51 deletions
diff --git a/go/filer/cassandra_store/cassandra_store.go b/go/filer/cassandra_store/cassandra_store.go new file mode 100644 index 000000000..83face686 --- /dev/null +++ b/go/filer/cassandra_store/cassandra_store.go @@ -0,0 +1,87 @@ +package cassandra_store + +import ( + "fmt" + + "github.com/chrislusf/weed-fs/go/glog" + + "github.com/gocql/gocql" +) + +/* + +Basically you need a table just like this: + +CREATE TABLE seaweed_files ( + path varchar, + fids list<varchar>, + PRIMARY KEY (path) +); + +Need to match flat_namespace.FlatNamespaceStore interface + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) + +*/ +type CassandraStore struct { + cluster *gocql.ClusterConfig + session *gocql.Session +} + +func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) { + c = &CassandraStore{} + c.cluster = gocql.NewCluster(hosts...) + c.cluster.Keyspace = keyspace + c.cluster.Consistency = gocql.Quorum + c.session, err = c.cluster.CreateSession() + if err != nil { + glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) + } + return +} + +func (c *CassandraStore) Put(fullFileName string, fid string) (err error) { + var input []string + input = append(input, fid) + if err := c.session.Query( + `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`, + fullFileName, input).Exec(); err != nil { + glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err) + return err + } + return nil +} +func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { + var output []string + if err := c.session.Query( + `select fids FROM seaweed_files WHERE path = ? LIMIT 1`, + fullFileName).Consistency(gocql.One).Scan(&output); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) + } + } + if len(output) == 0 { + return "", fmt.Errorf("No file id found for %s", fullFileName) + } + return output[0], nil +} + +// Currently the fid is not returned +func (c *CassandraStore) Delete(fullFileName string) (fid string, err error) { + if err := c.session.Query( + `DELETE FROM seaweed_files WHERE path = ?`, + fullFileName).Exec(); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err) + } + return "", err + } + return "", nil +} + +func (c *CassandraStore) Close() { + if c.session != nil { + c.session.Close() + } +} diff --git a/go/filer/cassandra_store/schema.cql b/go/filer/cassandra_store/schema.cql new file mode 100644 index 000000000..d6f2bb093 --- /dev/null +++ b/go/filer/cassandra_store/schema.cql @@ -0,0 +1,22 @@ +/* + +Here is the CQL to create the table.CassandraStore + +Optionally you can adjust the keyspace name and replication settings. + +For production server, very likely you want to set replication_factor to 3 + +*/ + +create keyspace seaweed WITH replication = { + 'class':'SimpleStrategy', + 'replication_factor':1 +}; + +use seaweed; + +CREATE TABLE seaweed_files ( + path varchar, + fids list<varchar>, + PRIMARY KEY (path) +); diff --git a/go/filer/client_operations.go b/go/filer/client_operations.go index 0b006289f..b38368735 100644 --- a/go/filer/client_operations.go +++ b/go/filer/client_operations.go @@ -1,12 +1,12 @@ package filer -import () - import ( - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "fmt" + + "github.com/chrislusf/weed-fs/go/util" + "net/url" ) diff --git a/go/filer/directory.go b/go/filer/directory.go deleted file mode 100644 index 956a2f504..000000000 --- a/go/filer/directory.go +++ /dev/null @@ -1,20 +0,0 @@ -package filer - -import () - -type DirectoryId int32 - -type DirectoryEntry struct { - Name string //dir name without path - Id DirectoryId -} - -type DirectoryManager interface { - FindDirectory(dirPath string) (DirectoryId, error) - ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) - MakeDirectory(currentDirPath string, dirName string) (DirectoryId, error) - MoveUnderDirectory(oldDirPath string, newParentDirPath string) error - DeleteDirectory(dirPath string) error - //functions used by FUSE - FindDirectoryById(DirectoryId, error) -} diff --git a/go/filer/design.txt b/go/filer/embedded_filer/design.txt index 45fec8fbe..45fec8fbe 100644 --- a/go/filer/design.txt +++ b/go/filer/embedded_filer/design.txt diff --git a/go/filer/embedded_filer/directory.go b/go/filer/embedded_filer/directory.go new file mode 100644 index 000000000..8ab3e4aff --- /dev/null +++ b/go/filer/embedded_filer/directory.go @@ -0,0 +1,15 @@ +package embedded_filer + +import ( + "github.com/chrislusf/weed-fs/go/filer" +) + +type DirectoryManager interface { + FindDirectory(dirPath string) (filer.DirectoryId, error) + ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) + MakeDirectory(currentDirPath string, dirName string) (filer.DirectoryId, error) + MoveUnderDirectory(oldDirPath string, newParentDirPath string) error + DeleteDirectory(dirPath string) error + //functions used by FUSE + FindDirectoryById(filer.DirectoryId, error) +} diff --git a/go/filer/directory_in_map.go b/go/filer/embedded_filer/directory_in_map.go index 1d88a78be..a1d0f43bd 100644 --- a/go/filer/directory_in_map.go +++ b/go/filer/embedded_filer/directory_in_map.go @@ -1,8 +1,7 @@ -package filer +package embedded_filer import ( "bufio" - "github.com/chrislusf/weed-fs/go/util" "fmt" "io" "os" @@ -10,6 +9,9 @@ import ( "strconv" "strings" "sync" + + "github.com/chrislusf/weed-fs/go/filer" + "github.com/chrislusf/weed-fs/go/util" ) var writeLock sync.Mutex //serialize changes to dir.log @@ -18,12 +20,12 @@ type DirectoryEntryInMap struct { Name string Parent *DirectoryEntryInMap SubDirectories map[string]*DirectoryEntryInMap - Id DirectoryId + Id filer.DirectoryId } type DirectoryManagerInMap struct { Root *DirectoryEntryInMap - max DirectoryId + max filer.DirectoryId logFile *os.File isLoading bool } @@ -82,7 +84,7 @@ func (dm *DirectoryManagerInMap) processEachLine(line string) error { if pe != nil { return pe } - if e := dm.loadDirectory(parts[1], DirectoryId(v)); e != nil { + if e := dm.loadDirectory(parts[1], filer.DirectoryId(v)); e != nil { return e } case "mov": @@ -141,7 +143,7 @@ func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryI } return dir, nil } -func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (DirectoryId, error) { +func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (filer.DirectoryId, error) { d, e := dm.findDirectory(dirPath) if e == nil { return d.Id, nil @@ -149,7 +151,7 @@ func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (DirectoryId, err return dm.Root.Id, e } -func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId DirectoryId) error { +func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId filer.DirectoryId) error { dirPath = filepath.Clean(dirPath) if dirPath == "/" { return nil @@ -200,7 +202,7 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn return dir, created } -func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (DirectoryId, error) { +func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (filer.DirectoryId, error) { dir, _ := dm.makeDirectory(dirPath) return dir.Id, nil } @@ -227,13 +229,13 @@ func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParent return nil } -func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []DirectoryEntry, err error) { +func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) { d, e := dm.findDirectory(dirPath) if e != nil { return dirNames, e } for k, v := range d.SubDirectories { - dirNames = append(dirNames, DirectoryEntry{Name: k, Id: v.Id}) + dirNames = append(dirNames, filer.DirectoryEntry{Name: k, Id: v.Id}) } return dirNames, nil } diff --git a/go/filer/directory_test.go b/go/filer/embedded_filer/directory_test.go index ea4903f03..cb0090bf2 100644 --- a/go/filer/directory_test.go +++ b/go/filer/embedded_filer/directory_test.go @@ -1,4 +1,4 @@ -package filer +package embedded_filer import ( "os" diff --git a/go/filer/filer_embedded.go b/go/filer/embedded_filer/filer_embedded.go index 3d3dac941..4321bb4e6 100644 --- a/go/filer/filer_embedded.go +++ b/go/filer/embedded_filer/filer_embedded.go @@ -1,11 +1,13 @@ -package filer +package embedded_filer import ( - "github.com/chrislusf/weed-fs/go/operation" "errors" "fmt" "path/filepath" "strings" + + "github.com/chrislusf/weed-fs/go/filer" + "github.com/chrislusf/weed-fs/go/operation" ) type FilerEmbedded struct { @@ -47,13 +49,13 @@ func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) { } return filer.files.FindFile(dirId, file) } -func (filer *FilerEmbedded) FindDirectory(dirPath string) (dirId DirectoryId, err error) { +func (filer *FilerEmbedded) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { return filer.directories.FindDirectory(dirPath) } -func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) { +func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { return filer.directories.ListDirectories(dirPath) } -func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) { +func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { dirId, e := filer.directories.FindDirectory(dirPath) if e != nil { return nil, e diff --git a/go/filer/files_in_leveldb.go b/go/filer/embedded_filer/files_in_leveldb.go index 41fbc74bd..c4fab734f 100644 --- a/go/filer/files_in_leveldb.go +++ b/go/filer/embedded_filer/files_in_leveldb.go @@ -1,7 +1,9 @@ -package filer +package embedded_filer import ( "bytes" + + "github.com/chrislusf/weed-fs/go/filer" "github.com/chrislusf/weed-fs/go/glog" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" @@ -26,7 +28,7 @@ func NewFileListInLevelDb(dir string) (fl *FileListInLevelDb, err error) { return } -func genKey(dirId DirectoryId, fileName string) []byte { +func genKey(dirId filer.DirectoryId, fileName string) []byte { ret := make([]byte, 0, 4+len(fileName)) for i := 3; i >= 0; i-- { ret = append(ret, byte(dirId>>(uint(i)*8))) @@ -35,25 +37,25 @@ func genKey(dirId DirectoryId, fileName string) []byte { return ret } -func (fl *FileListInLevelDb) CreateFile(dirId DirectoryId, fileName string, fid string) (err error) { +func (fl *FileListInLevelDb) CreateFile(dirId filer.DirectoryId, fileName string, fid string) (err error) { glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid) return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil) } -func (fl *FileListInLevelDb) DeleteFile(dirId DirectoryId, fileName string) (fid string, err error) { +func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { if fid, err = fl.FindFile(dirId, fileName); err != nil { return } err = fl.db.Delete(genKey(dirId, fileName), nil) return fid, err } -func (fl *FileListInLevelDb) FindFile(dirId DirectoryId, fileName string) (fid string, err error) { +func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { data, e := fl.db.Get(genKey(dirId, fileName), nil) if e != nil { return "", e } return string(data), nil } -func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, limit int) (files []FileEntry) { +func (fl *FileListInLevelDb) ListFiles(dirId filer.DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) { glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit) dirKey := genKey(dirId, "") iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil) @@ -73,7 +75,7 @@ func (fl *FileListInLevelDb) ListFiles(dirId DirectoryId, lastFileName string, l break } } - files = append(files, FileEntry{Name: fileName, Id: FileId(string(iter.Value()))}) + files = append(files, filer.FileEntry{Name: fileName, Id: filer.FileId(string(iter.Value()))}) } iter.Release() return diff --git a/go/filer/filer.go b/go/filer/filer.go index de877fc1f..5a2584c4a 100644 --- a/go/filer/filer.go +++ b/go/filer/filer.go @@ -1,7 +1,5 @@ package filer -import () - type FileId string //file id on weedfs type FileEntry struct { @@ -9,13 +7,22 @@ type FileEntry struct { Id FileId `json:"fid,omitempty"` } +type DirectoryId int32 + +type DirectoryEntry struct { + Name string //dir name without path + Id DirectoryId +} + type Filer interface { - CreateFile(filePath string, fid string) (err error) - FindFile(filePath string) (fid string, err error) + CreateFile(fullFileName string, fid string) (err error) + FindFile(fullFileName string) (fid string, err error) + DeleteFile(fullFileName string) (fid string, err error) + + //Optional functions. embedded filer support these FindDirectory(dirPath string) (dirId DirectoryId, err error) ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) DeleteDirectory(dirPath string, recursive bool) (err error) - DeleteFile(filePath string) (fid string, err error) Move(fromPath string, toPath string) (err error) } diff --git a/go/filer/flat_namespace/flat_namespace_filer.go b/go/filer/flat_namespace/flat_namespace_filer.go new file mode 100644 index 000000000..021b809d6 --- /dev/null +++ b/go/filer/flat_namespace/flat_namespace_filer.go @@ -0,0 +1,50 @@ +package flat_namespace + +import ( + "errors" + + "github.com/chrislusf/weed-fs/go/filer" +) + +type FlatNamesapceFiler struct { + master string + store FlatNamespaceStore +} + +var ( + NotImplemented = errors.New("Not Implemented for flat namespace meta data store!") +) + +func NewFlatNamesapceFiler(master string, store FlatNamespaceStore) *FlatNamesapceFiler { + return &FlatNamesapceFiler{ + master: master, + store: store, + } +} + +func (filer *FlatNamesapceFiler) CreateFile(fullFileName string, fid string) (err error) { + return filer.store.Put(fullFileName, fid) +} +func (filer *FlatNamesapceFiler) FindFile(fullFileName string) (fid string, err error) { + return filer.store.Get(fullFileName) +} +func (filer *FlatNamesapceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return 0, NotImplemented +} +func (filer *FlatNamesapceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return nil, NotImplemented +} +func (filer *FlatNamesapceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return nil, NotImplemented +} +func (filer *FlatNamesapceFiler) DeleteDirectory(dirPath string, recursive bool) (err error) { + return NotImplemented +} + +func (filer *FlatNamesapceFiler) DeleteFile(fullFileName string) (fid string, err error) { + return filer.store.Delete(fullFileName) +} + +func (filer *FlatNamesapceFiler) Move(fromPath string, toPath string) error { + return NotImplemented +} diff --git a/go/filer/flat_namespace/flat_namespace_store.go b/go/filer/flat_namespace/flat_namespace_store.go new file mode 100644 index 000000000..832b70e40 --- /dev/null +++ b/go/filer/flat_namespace/flat_namespace_store.go @@ -0,0 +1,9 @@ +package flat_namespace + +import () + +type FlatNamespaceStore interface { + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) +} diff --git a/go/filer/redis_store/redis_store.go b/go/filer/redis_store/redis_store.go new file mode 100644 index 000000000..8fe1c7ca5 --- /dev/null +++ b/go/filer/redis_store/redis_store.go @@ -0,0 +1,48 @@ +package redis_store + +import ( + redis "gopkg.in/redis.v2" +) + +type RedisStore struct { + Client *redis.Client +} + +func NewRedisStore(hostPort string, database int) *RedisStore { + client := redis.NewTCPClient(&redis.Options{ + Addr: hostPort, + Password: "", // no password set + DB: int64(database), + }) + return &RedisStore{Client: client} +} + +func (s *RedisStore) Get(fullFileName string) (fid string, err error) { + fid, err = s.Client.Get(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return fid, err +} +func (s *RedisStore) Put(fullFileName string, fid string) (err error) { + _, err = s.Client.Set(fullFileName, fid).Result() + if err == redis.Nil { + err = nil + } + return err +} + +// Currently the fid is not returned +func (s *RedisStore) Delete(fullFileName string) (fid string, err error) { + _, err = s.Client.Del(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return "", err +} + +func (c *RedisStore) Close() { + if c.Client != nil { + c.Client.Close() + } +} |
