diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/cassandra_store/cassandra_store.go | 87 | ||||
| -rw-r--r-- | weed/filer/cassandra_store/schema.cql | 22 | ||||
| -rw-r--r-- | weed/filer/client_operations.go | 70 | ||||
| -rw-r--r-- | weed/filer/embedded_filer/design.txt | 26 | ||||
| -rw-r--r-- | weed/filer/embedded_filer/directory.go | 15 | ||||
| -rw-r--r-- | weed/filer/embedded_filer/directory_in_map.go | 310 | ||||
| -rw-r--r-- | weed/filer/embedded_filer/directory_test.go | 86 | ||||
| -rw-r--r-- | weed/filer/embedded_filer/filer_embedded.go | 141 | ||||
| -rw-r--r-- | weed/filer/embedded_filer/files_in_leveldb.go | 85 | ||||
| -rw-r--r-- | weed/filer/filer.go | 28 | ||||
| -rw-r--r-- | weed/filer/flat_namespace/flat_namespace_filer.go | 50 | ||||
| -rw-r--r-- | weed/filer/flat_namespace/flat_namespace_store.go | 9 | ||||
| -rw-r--r-- | weed/filer/redis_store/redis_store.go | 48 |
13 files changed, 977 insertions, 0 deletions
diff --git a/weed/filer/cassandra_store/cassandra_store.go b/weed/filer/cassandra_store/cassandra_store.go new file mode 100644 index 000000000..4ee2f65be --- /dev/null +++ b/weed/filer/cassandra_store/cassandra_store.go @@ -0,0 +1,87 @@ +package cassandra_store + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + + "github.com/gocql/gocql" +) + +/* + +Basically you need a table just like this: + +CREATE TABLE seaweed_files ( + path varchar, + fids list<varchar>, + PRIMARY KEY (path) +); + +Need to match flat_namespace.FlatNamespaceStore interface + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) + +*/ +type CassandraStore struct { + cluster *gocql.ClusterConfig + session *gocql.Session +} + +func NewCassandraStore(keyspace string, hosts ...string) (c *CassandraStore, err error) { + c = &CassandraStore{} + c.cluster = gocql.NewCluster(hosts...) + c.cluster.Keyspace = keyspace + c.cluster.Consistency = gocql.Quorum + c.session, err = c.cluster.CreateSession() + if err != nil { + glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) + } + return +} + +func (c *CassandraStore) Put(fullFileName string, fid string) (err error) { + var input []string + input = append(input, fid) + if err := c.session.Query( + `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`, + fullFileName, input).Exec(); err != nil { + glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err) + return err + } + return nil +} +func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { + var output []string + if err := c.session.Query( + `select fids FROM seaweed_files WHERE path = ? LIMIT 1`, + fullFileName).Consistency(gocql.One).Scan(&output); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) + } + } + if len(output) == 0 { + return "", fmt.Errorf("No file id found for %s", fullFileName) + } + return output[0], nil +} + +// Currently the fid is not returned +func (c *CassandraStore) Delete(fullFileName string) (fid string, err error) { + if err := c.session.Query( + `DELETE FROM seaweed_files WHERE path = ?`, + fullFileName).Exec(); err != nil { + if err != gocql.ErrNotFound { + glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err) + } + return "", err + } + return "", nil +} + +func (c *CassandraStore) Close() { + if c.session != nil { + c.session.Close() + } +} diff --git a/weed/filer/cassandra_store/schema.cql b/weed/filer/cassandra_store/schema.cql new file mode 100644 index 000000000..d6f2bb093 --- /dev/null +++ b/weed/filer/cassandra_store/schema.cql @@ -0,0 +1,22 @@ +/* + +Here is the CQL to create the table.CassandraStore + +Optionally you can adjust the keyspace name and replication settings. + +For production server, very likely you want to set replication_factor to 3 + +*/ + +create keyspace seaweed WITH replication = { + 'class':'SimpleStrategy', + 'replication_factor':1 +}; + +use seaweed; + +CREATE TABLE seaweed_files ( + path varchar, + fids list<varchar>, + PRIMARY KEY (path) +); diff --git a/weed/filer/client_operations.go b/weed/filer/client_operations.go new file mode 100644 index 000000000..13e4854a4 --- /dev/null +++ b/weed/filer/client_operations.go @@ -0,0 +1,70 @@ +package filer + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/util" + + "net/url" +) + +type ApiRequest struct { + Command string //"listFiles", "listDirectories" + Directory string + FileName string +} + +type ListFilesResult struct { + Files []FileEntry + Error string `json:"error,omitempty"` +} + +func ListFiles(server string, directory string, fileName string) (*ListFilesResult, error) { + var ret ListFilesResult + if err := call(server, ApiRequest{Command: "listFiles", Directory: directory, FileName: fileName}, &ret); err == nil { + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil + } else { + return nil, err + } +} + +type ListDirectoriesResult struct { + Directories []DirectoryEntry + Error string `json:"error,omitempty"` +} + +func ListDirectories(server string, directory string) (*ListDirectoriesResult, error) { + var ret ListDirectoriesResult + if err := call(server, ApiRequest{Command: "listDirectories", Directory: directory}, &ret); err == nil { + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil + } else { + return nil, err + } +} + +func call(server string, request ApiRequest, ret interface{}) error { + b, err := json.Marshal(request) + if err != nil { + fmt.Println("error:", err) + return nil + } + values := make(url.Values) + values.Add("request", string(b)) + jsonBlob, err := util.Post("http://"+server+"/__api__", values) + if err != nil { + return err + } + err = json.Unmarshal(jsonBlob, ret) + if err != nil { + return err + } + return nil +} diff --git a/weed/filer/embedded_filer/design.txt b/weed/filer/embedded_filer/design.txt new file mode 100644 index 000000000..45fec8fbe --- /dev/null +++ b/weed/filer/embedded_filer/design.txt @@ -0,0 +1,26 @@ +Design Assumptions: +1. the number of directories are magnitudely smaller than the number of files +2. unlimited number of files under any directories +Phylosophy: + metadata for directories and files should be separated +Design: + Store directories in normal map + all of directories hopefully all be in memory + efficient to move/rename/list_directories + Log directory changes to append only log file + Store files in sorted string table in <dir_id/filename> format + efficient to list_files, just simple iterator + efficient to locate files, binary search + +Testing: +1. starting server, "weed server -filer=true" +2. posting files to different folders +curl -F "filename=@design.txt" "http://localhost:8888/sources/" +curl -F "filename=@design.txt" "http://localhost:8888/design/" +curl -F "filename=@directory.go" "http://localhost:8888/sources/weed/go/" +curl -F "filename=@directory.go" "http://localhost:8888/sources/testing/go/" +curl -F "filename=@filer.go" "http://localhost:8888/sources/weed/go/" +curl -F "filename=@filer_in_leveldb.go" "http://localhost:8888/sources/weed/go/" +curl "http://localhost:8888/?pretty=y" +curl "http://localhost:8888/sources/weed/go/?pretty=y" +curl "http://localhost:8888/sources/weed/go/?pretty=y" diff --git a/weed/filer/embedded_filer/directory.go b/weed/filer/embedded_filer/directory.go new file mode 100644 index 000000000..4d4bd1c59 --- /dev/null +++ b/weed/filer/embedded_filer/directory.go @@ -0,0 +1,15 @@ +package embedded_filer + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" +) + +type DirectoryManager interface { + FindDirectory(dirPath string) (filer.DirectoryId, error) + ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) + MakeDirectory(currentDirPath string, dirName string) (filer.DirectoryId, error) + MoveUnderDirectory(oldDirPath string, newParentDirPath string) error + DeleteDirectory(dirPath string) error + //functions used by FUSE + FindDirectoryById(filer.DirectoryId, error) +} diff --git a/weed/filer/embedded_filer/directory_in_map.go b/weed/filer/embedded_filer/directory_in_map.go new file mode 100644 index 000000000..5100f3531 --- /dev/null +++ b/weed/filer/embedded_filer/directory_in_map.go @@ -0,0 +1,310 @@ +package embedded_filer + +import ( + "bufio" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var writeLock sync.Mutex //serialize changes to dir.log + +type DirectoryEntryInMap struct { + sync.Mutex + Name string + Parent *DirectoryEntryInMap + subDirectories map[string]*DirectoryEntryInMap + Id filer.DirectoryId +} + +func (de *DirectoryEntryInMap) getChild(dirName string) (*DirectoryEntryInMap, bool) { + de.Lock() + defer de.Unlock() + child, ok := de.subDirectories[dirName] + return child, ok +} +func (de *DirectoryEntryInMap) addChild(dirName string, child *DirectoryEntryInMap) { + de.Lock() + defer de.Unlock() + de.subDirectories[dirName] = child +} +func (de *DirectoryEntryInMap) removeChild(dirName string) { + de.Lock() + defer de.Unlock() + delete(de.subDirectories, dirName) +} +func (de *DirectoryEntryInMap) hasChildren() bool { + de.Lock() + defer de.Unlock() + return len(de.subDirectories) > 0 +} +func (de *DirectoryEntryInMap) children() (dirNames []filer.DirectoryEntry) { + de.Lock() + defer de.Unlock() + for k, v := range de.subDirectories { + dirNames = append(dirNames, filer.DirectoryEntry{Name: k, Id: v.Id}) + } + return dirNames +} + +type DirectoryManagerInMap struct { + Root *DirectoryEntryInMap + max filer.DirectoryId + logFile *os.File + isLoading bool +} + +func (dm *DirectoryManagerInMap) newDirectoryEntryInMap(parent *DirectoryEntryInMap, name string) (d *DirectoryEntryInMap, err error) { + d = &DirectoryEntryInMap{Name: name, Parent: parent, subDirectories: make(map[string]*DirectoryEntryInMap)} + var parts []string + for p := d; p != nil && p.Name != ""; p = p.Parent { + parts = append(parts, p.Name) + } + n := len(parts) + if n <= 0 { + return nil, fmt.Errorf("Failed to create folder %s/%s", parent.Name, name) + } + for i := 0; i < n/2; i++ { + parts[i], parts[n-1-i] = parts[n-1-i], parts[i] + } + dm.max++ + d.Id = dm.max + dm.log("add", "/"+strings.Join(parts, "/"), strconv.Itoa(int(d.Id))) + return d, nil +} + +func (dm *DirectoryManagerInMap) log(words ...string) { + if !dm.isLoading { + dm.logFile.WriteString(strings.Join(words, "\t") + "\n") + } +} + +func NewDirectoryManagerInMap(dirLogFile string) (dm *DirectoryManagerInMap, err error) { + dm = &DirectoryManagerInMap{} + //dm.Root do not use newDirectoryEntryInMap, since dm.max will be changed + dm.Root = &DirectoryEntryInMap{subDirectories: make(map[string]*DirectoryEntryInMap)} + if dm.logFile, err = os.OpenFile(dirLogFile, os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write directory log file %s: %v", dirLogFile, err) + } + return dm, dm.load() +} + +func (dm *DirectoryManagerInMap) processEachLine(line string) error { + if strings.HasPrefix(line, "#") { + return nil + } + if line == "" { + return nil + } + parts := strings.Split(line, "\t") + if len(parts) == 0 { + return nil + } + switch parts[0] { + case "add": + v, pe := strconv.Atoi(parts[2]) + if pe != nil { + return pe + } + if e := dm.loadDirectory(parts[1], filer.DirectoryId(v)); e != nil { + return e + } + case "mov": + newName := "" + if len(parts) >= 4 { + newName = parts[3] + } + if e := dm.MoveUnderDirectory(parts[1], parts[2], newName); e != nil { + return e + } + case "del": + if e := dm.DeleteDirectory(parts[1]); e != nil { + return e + } + default: + fmt.Printf("line %s has %s!\n", line, parts[0]) + return nil + } + return nil +} +func (dm *DirectoryManagerInMap) load() error { + dm.max = 0 + lines := bufio.NewReader(dm.logFile) + dm.isLoading = true + defer func() { dm.isLoading = false }() + for { + line, err := util.Readln(lines) + if err != nil && err != io.EOF { + return err + } + if pe := dm.processEachLine(string(line)); pe != nil { + return pe + } + if err == io.EOF { + return nil + } + } +} + +func (dm *DirectoryManagerInMap) findDirectory(dirPath string) (*DirectoryEntryInMap, error) { + if dirPath == "" { + return dm.Root, nil + } + dirPath = CleanFilePath(dirPath) + if dirPath == "/" { + return dm.Root, nil + } + parts := strings.Split(dirPath, "/") + dir := dm.Root + for i := 1; i < len(parts); i++ { + if sub, ok := dir.getChild(parts[i]); ok { + dir = sub + } else { + return dm.Root, fmt.Errorf("Directory %s Not Found", dirPath) + } + } + return dir, nil +} +func (dm *DirectoryManagerInMap) FindDirectory(dirPath string) (filer.DirectoryId, error) { + d, e := dm.findDirectory(dirPath) + if e == nil { + return d.Id, nil + } + return dm.Root.Id, e +} + +func (dm *DirectoryManagerInMap) loadDirectory(dirPath string, dirId filer.DirectoryId) error { + dirPath = CleanFilePath(dirPath) + if dirPath == "/" { + return nil + } + parts := strings.Split(dirPath, "/") + dir := dm.Root + for i := 1; i < len(parts); i++ { + sub, ok := dir.getChild(parts[i]) + if !ok { + writeLock.Lock() + if sub2, createdByOtherThread := dir.getChild(parts[i]); createdByOtherThread { + sub = sub2 + } else { + if i != len(parts)-1 { + writeLock.Unlock() + return fmt.Errorf("%s should be created after parent %s", dirPath, parts[i]) + } + var err error + sub, err = dm.newDirectoryEntryInMap(dir, parts[i]) + if err != nil { + writeLock.Unlock() + return err + } + if sub.Id != dirId { + writeLock.Unlock() + // the dir.log should be the same order as in-memory directory id + return fmt.Errorf("%s should be have id %v instead of %v", dirPath, sub.Id, dirId) + } + dir.addChild(parts[i], sub) + } + writeLock.Unlock() + } + dir = sub + } + return nil +} + +func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEntryInMap, created bool) { + dirPath = CleanFilePath(dirPath) + if dirPath == "/" { + return dm.Root, false + } + parts := strings.Split(dirPath, "/") + dir = dm.Root + for i := 1; i < len(parts); i++ { + sub, ok := dir.getChild(parts[i]) + if !ok { + writeLock.Lock() + if sub2, createdByOtherThread := dir.getChild(parts[i]); createdByOtherThread { + sub = sub2 + } else { + var err error + sub, err = dm.newDirectoryEntryInMap(dir, parts[i]) + if err != nil { + writeLock.Unlock() + return nil, false + } + dir.addChild(parts[i], sub) + created = true + } + writeLock.Unlock() + } + dir = sub + } + return dir, created +} + +func (dm *DirectoryManagerInMap) MakeDirectory(dirPath string) (filer.DirectoryId, error) { + dir, _ := dm.makeDirectory(dirPath) + return dir.Id, nil +} + +func (dm *DirectoryManagerInMap) MoveUnderDirectory(oldDirPath string, newParentDirPath string, newName string) error { + writeLock.Lock() + defer writeLock.Unlock() + oldDir, oe := dm.findDirectory(oldDirPath) + if oe != nil { + return oe + } + parentDir, pe := dm.findDirectory(newParentDirPath) + if pe != nil { + return pe + } + dm.log("mov", oldDirPath, newParentDirPath, newName) + oldDir.Parent.removeChild(oldDir.Name) + if newName == "" { + newName = oldDir.Name + } + parentDir.addChild(newName, oldDir) + oldDir.Name = newName + oldDir.Parent = parentDir + return nil +} + +func (dm *DirectoryManagerInMap) ListDirectories(dirPath string) (dirNames []filer.DirectoryEntry, err error) { + d, e := dm.findDirectory(dirPath) + if e != nil { + return dirNames, e + } + return d.children(), nil +} +func (dm *DirectoryManagerInMap) DeleteDirectory(dirPath string) error { + writeLock.Lock() + defer writeLock.Unlock() + if dirPath == "/" { + return fmt.Errorf("Can not delete %s", dirPath) + } + d, e := dm.findDirectory(dirPath) + if e != nil { + return e + } + if d.hasChildren() { + return fmt.Errorf("dir %s still has sub directories", dirPath) + } + d.Parent.removeChild(d.Name) + d.Parent = nil + dm.log("del", dirPath) + return nil +} + +func CleanFilePath(fp string) string { + ret := filepath.Clean(fp) + if os.PathSeparator == '\\' { + return strings.Replace(ret, "\\", "/", -1) + } + return ret +} diff --git a/weed/filer/embedded_filer/directory_test.go b/weed/filer/embedded_filer/directory_test.go new file mode 100644 index 000000000..c8b3f1f30 --- /dev/null +++ b/weed/filer/embedded_filer/directory_test.go @@ -0,0 +1,86 @@ +package embedded_filer + +import ( + "os" + "strings" + "testing" +) + +func TestDirectory(t *testing.T) { + dm, _ := NewDirectoryManagerInMap("/tmp/dir.log") + defer func() { + if true { + os.Remove("/tmp/dir.log") + } + }() + dm.MakeDirectory("/a/b/c") + dm.MakeDirectory("/a/b/d") + dm.MakeDirectory("/a/b/e") + dm.MakeDirectory("/a/b/e/f") + dm.MakeDirectory("/a/b/e/f/g") + dm.MoveUnderDirectory("/a/b/e/f/g", "/a/b", "t") + if _, err := dm.FindDirectory("/a/b/e/f/g"); err == nil { + t.Fatal("/a/b/e/f/g should not exist any more after moving") + } + if _, err := dm.FindDirectory("/a/b/t"); err != nil { + t.Fatal("/a/b/t should exist after moving") + } + if _, err := dm.FindDirectory("/a/b/g"); err == nil { + t.Fatal("/a/b/g should not exist after moving") + } + dm.MoveUnderDirectory("/a/b/e/f", "/a/b", "") + if _, err := dm.FindDirectory("/a/b/f"); err != nil { + t.Fatal("/a/b/g should not exist after moving") + } + dm.MakeDirectory("/a/b/g/h/i") + dm.DeleteDirectory("/a/b/e/f") + dm.DeleteDirectory("/a/b/e") + dirNames, _ := dm.ListDirectories("/a/b/e") + for _, v := range dirNames { + println("sub1 dir:", v.Name, "id", v.Id) + } + dm.logFile.Close() + + var path []string + printTree(dm.Root, path) + + dm2, e := NewDirectoryManagerInMap("/tmp/dir.log") + if e != nil { + println("load error", e.Error()) + } + if !compare(dm.Root, dm2.Root) { + t.Fatal("restored dir not the same!") + } + printTree(dm2.Root, path) +} + +func printTree(node *DirectoryEntryInMap, path []string) { + println(strings.Join(path, "/") + "/" + node.Name) + path = append(path, node.Name) + for _, v := range node.subDirectories { + printTree(v, path) + } +} + +func compare(root1 *DirectoryEntryInMap, root2 *DirectoryEntryInMap) bool { + if len(root1.subDirectories) != len(root2.subDirectories) { + return false + } + if root1.Name != root2.Name { + return false + } + if root1.Id != root2.Id { + return false + } + if !(root1.Parent == nil && root2.Parent == nil) { + if root1.Parent.Id != root2.Parent.Id { + return false + } + } + for k, v := range root1.subDirectories { + if !compare(v, root2.subDirectories[k]) { + return false + } + } + return true +} diff --git a/weed/filer/embedded_filer/filer_embedded.go b/weed/filer/embedded_filer/filer_embedded.go new file mode 100644 index 000000000..27299eb40 --- /dev/null +++ b/weed/filer/embedded_filer/filer_embedded.go @@ -0,0 +1,141 @@ +package embedded_filer + +import ( + "errors" + "fmt" + "path/filepath" + "strings" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/operation" +) + +type FilerEmbedded struct { + master string + directories *DirectoryManagerInMap + files *FileListInLevelDb +} + +func NewFilerEmbedded(master string, dir string) (filer *FilerEmbedded, err error) { + dm, de := NewDirectoryManagerInMap(filepath.Join(dir, "dir.log")) + if de != nil { + return nil, de + } + fl, fe := NewFileListInLevelDb(dir) + if fe != nil { + return nil, fe + } + filer = &FilerEmbedded{ + master: master, + directories: dm, + files: fl, + } + return +} + +func (filer *FilerEmbedded) CreateFile(filePath string, fid string) (err error) { + dir, file := filepath.Split(filePath) + dirId, e := filer.directories.MakeDirectory(dir) + if e != nil { + return e + } + return filer.files.CreateFile(dirId, file, fid) +} +func (filer *FilerEmbedded) FindFile(filePath string) (fid string, err error) { + dir, file := filepath.Split(filePath) + dirId, e := filer.directories.FindDirectory(dir) + if e != nil { + return "", e + } + return filer.files.FindFile(dirId, file) +} +func (filer *FilerEmbedded) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return filer.directories.FindDirectory(dirPath) +} +func (filer *FilerEmbedded) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return filer.directories.ListDirectories(dirPath) +} +func (filer *FilerEmbedded) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + dirId, e := filer.directories.FindDirectory(dirPath) + if e != nil { + return nil, e + } + return filer.files.ListFiles(dirId, lastFileName, limit), nil +} +func (filer *FilerEmbedded) DeleteDirectory(dirPath string, recursive bool) (err error) { + dirId, e := filer.directories.FindDirectory(dirPath) + if e != nil { + return e + } + if sub_dirs, sub_err := filer.directories.ListDirectories(dirPath); sub_err == nil { + if len(sub_dirs) > 0 && !recursive { + return fmt.Errorf("Fail to delete directory %s: %d sub directories found!", dirPath, len(sub_dirs)) + } + for _, sub := range sub_dirs { + if delete_sub_err := filer.DeleteDirectory(filepath.Join(dirPath, sub.Name), recursive); delete_sub_err != nil { + return delete_sub_err + } + } + } + list := filer.files.ListFiles(dirId, "", 100) + if len(list) != 0 && !recursive { + if !recursive { + return fmt.Errorf("Fail to delete non-empty directory %s!", dirPath) + } + } + for { + if len(list) == 0 { + return filer.directories.DeleteDirectory(dirPath) + } + var fids []string + for _, fileEntry := range list { + fids = append(fids, string(fileEntry.Id)) + } + if result_list, delete_file_err := operation.DeleteFiles(filer.master, fids); delete_file_err != nil { + return delete_file_err + } else { + if len(result_list.Errors) > 0 { + return errors.New(strings.Join(result_list.Errors, "\n")) + } + } + lastFile := list[len(list)-1] + list = filer.files.ListFiles(dirId, lastFile.Name, 100) + } + +} + +func (filer *FilerEmbedded) DeleteFile(filePath string) (fid string, err error) { + dir, file := filepath.Split(filePath) + dirId, e := filer.directories.FindDirectory(dir) + if e != nil { + return "", e + } + return filer.files.DeleteFile(dirId, file) +} + +/* +Move a folder or a file, with 4 Use cases: +mv fromDir toNewDir +mv fromDir toOldDir +mv fromFile toDir +mv fromFile toFile +*/ +func (filer *FilerEmbedded) Move(fromPath string, toPath string) error { + if _, dir_err := filer.FindDirectory(fromPath); dir_err == nil { + if _, err := filer.FindDirectory(toPath); err == nil { + // move folder under an existing folder + return filer.directories.MoveUnderDirectory(fromPath, toPath, "") + } + // move folder to a new folder + return filer.directories.MoveUnderDirectory(fromPath, filepath.Dir(toPath), filepath.Base(toPath)) + } + if fid, file_err := filer.DeleteFile(fromPath); file_err == nil { + if _, err := filer.FindDirectory(toPath); err == nil { + // move file under an existing folder + return filer.CreateFile(filepath.Join(toPath, filepath.Base(fromPath)), fid) + } + // move to a folder with new name + return filer.CreateFile(toPath, fid) + } + return fmt.Errorf("File %s is not found!", fromPath) +} diff --git a/weed/filer/embedded_filer/files_in_leveldb.go b/weed/filer/embedded_filer/files_in_leveldb.go new file mode 100644 index 000000000..19f6dd7e8 --- /dev/null +++ b/weed/filer/embedded_filer/files_in_leveldb.go @@ -0,0 +1,85 @@ +package embedded_filer + +import ( + "bytes" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +/* +The entry in level db has this format: + key: genKey(dirId, fileName) + value: []byte(fid) +And genKey(dirId, fileName) use first 4 bytes to store dirId, and rest for fileName +*/ + +type FileListInLevelDb struct { + db *leveldb.DB +} + +func NewFileListInLevelDb(dir string) (fl *FileListInLevelDb, err error) { + fl = &FileListInLevelDb{} + if fl.db, err = leveldb.OpenFile(dir, nil); err != nil { + return + } + return +} + +func genKey(dirId filer.DirectoryId, fileName string) []byte { + ret := make([]byte, 0, 4+len(fileName)) + for i := 3; i >= 0; i-- { + ret = append(ret, byte(dirId>>(uint(i)*8))) + } + ret = append(ret, []byte(fileName)...) + return ret +} + +func (fl *FileListInLevelDb) CreateFile(dirId filer.DirectoryId, fileName string, fid string) (err error) { + glog.V(4).Infoln("directory", dirId, "fileName", fileName, "fid", fid) + return fl.db.Put(genKey(dirId, fileName), []byte(fid), nil) +} +func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { + if fid, err = fl.FindFile(dirId, fileName); err != nil { + if err == leveldb.ErrNotFound { + return "", nil + } + return + } + err = fl.db.Delete(genKey(dirId, fileName), nil) + return fid, err +} +func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { + data, e := fl.db.Get(genKey(dirId, fileName), nil) + if e != nil { + return "", e + } + return string(data), nil +} +func (fl *FileListInLevelDb) ListFiles(dirId filer.DirectoryId, lastFileName string, limit int) (files []filer.FileEntry) { + glog.V(4).Infoln("directory", dirId, "lastFileName", lastFileName, "limit", limit) + dirKey := genKey(dirId, "") + iter := fl.db.NewIterator(&util.Range{Start: genKey(dirId, lastFileName)}, nil) + limitCounter := 0 + for iter.Next() { + key := iter.Key() + if !bytes.HasPrefix(key, dirKey) { + break + } + fileName := string(key[len(dirKey):]) + if fileName == lastFileName { + continue + } + limitCounter++ + if limit > 0 { + if limitCounter > limit { + break + } + } + files = append(files, filer.FileEntry{Name: fileName, Id: filer.FileId(string(iter.Value()))}) + } + iter.Release() + return +} diff --git a/weed/filer/filer.go b/weed/filer/filer.go new file mode 100644 index 000000000..fd23e119c --- /dev/null +++ b/weed/filer/filer.go @@ -0,0 +1,28 @@ +package filer + +type FileId string //file id in SeaweedFS + +type FileEntry struct { + Name string `json:"name,omitempty"` //file name without path + Id FileId `json:"fid,omitempty"` +} + +type DirectoryId int32 + +type DirectoryEntry struct { + Name string //dir name without path + Id DirectoryId +} + +type Filer interface { + CreateFile(fullFileName string, fid string) (err error) + FindFile(fullFileName string) (fid string, err error) + DeleteFile(fullFileName string) (fid string, err error) + + //Optional functions. embedded filer support these + FindDirectory(dirPath string) (dirId DirectoryId, err error) + ListDirectories(dirPath string) (dirs []DirectoryEntry, err error) + ListFiles(dirPath string, lastFileName string, limit int) (files []FileEntry, err error) + DeleteDirectory(dirPath string, recursive bool) (err error) + Move(fromPath string, toPath string) (err error) +} diff --git a/weed/filer/flat_namespace/flat_namespace_filer.go b/weed/filer/flat_namespace/flat_namespace_filer.go new file mode 100644 index 000000000..c20fd2521 --- /dev/null +++ b/weed/filer/flat_namespace/flat_namespace_filer.go @@ -0,0 +1,50 @@ +package flat_namespace + +import ( + "errors" + + "github.com/chrislusf/seaweedfs/weed/filer" +) + +type FlatNamespaceFiler struct { + master string + store FlatNamespaceStore +} + +var ( + ErrNotImplemented = errors.New("Not Implemented for flat namespace meta data store") +) + +func NewFlatNamespaceFiler(master string, store FlatNamespaceStore) *FlatNamespaceFiler { + return &FlatNamespaceFiler{ + master: master, + store: store, + } +} + +func (filer *FlatNamespaceFiler) CreateFile(fullFileName string, fid string) (err error) { + return filer.store.Put(fullFileName, fid) +} +func (filer *FlatNamespaceFiler) FindFile(fullFileName string) (fid string, err error) { + return filer.store.Get(fullFileName) +} +func (filer *FlatNamespaceFiler) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) { + return 0, ErrNotImplemented +} +func (filer *FlatNamespaceFiler) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) { + return nil, ErrNotImplemented +} +func (filer *FlatNamespaceFiler) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) { + return nil, ErrNotImplemented +} +func (filer *FlatNamespaceFiler) DeleteDirectory(dirPath string, recursive bool) (err error) { + return ErrNotImplemented +} + +func (filer *FlatNamespaceFiler) DeleteFile(fullFileName string) (fid string, err error) { + return filer.store.Delete(fullFileName) +} + +func (filer *FlatNamespaceFiler) Move(fromPath string, toPath string) error { + return ErrNotImplemented +} diff --git a/weed/filer/flat_namespace/flat_namespace_store.go b/weed/filer/flat_namespace/flat_namespace_store.go new file mode 100644 index 000000000..832b70e40 --- /dev/null +++ b/weed/filer/flat_namespace/flat_namespace_store.go @@ -0,0 +1,9 @@ +package flat_namespace + +import () + +type FlatNamespaceStore interface { + Put(fullFileName string, fid string) (err error) + Get(fullFileName string) (fid string, err error) + Delete(fullFileName string) (fid string, err error) +} diff --git a/weed/filer/redis_store/redis_store.go b/weed/filer/redis_store/redis_store.go new file mode 100644 index 000000000..939172557 --- /dev/null +++ b/weed/filer/redis_store/redis_store.go @@ -0,0 +1,48 @@ +package redis_store + +import ( + redis "gopkg.in/redis.v2" +) + +type RedisStore struct { + Client *redis.Client +} + +func NewRedisStore(hostPort string, password string, database int) *RedisStore { + client := redis.NewTCPClient(&redis.Options{ + Addr: hostPort, + Password: password, + DB: int64(database), + }) + return &RedisStore{Client: client} +} + +func (s *RedisStore) Get(fullFileName string) (fid string, err error) { + fid, err = s.Client.Get(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return fid, err +} +func (s *RedisStore) Put(fullFileName string, fid string) (err error) { + _, err = s.Client.Set(fullFileName, fid).Result() + if err == redis.Nil { + err = nil + } + return err +} + +// Currently the fid is not returned +func (s *RedisStore) Delete(fullFileName string) (fid string, err error) { + _, err = s.Client.Del(fullFileName).Result() + if err == redis.Nil { + err = nil + } + return "", err +} + +func (s *RedisStore) Close() { + if s.Client != nil { + s.Client.Close() + } +} |
