diff options
Diffstat (limited to 'weed/filer2/filer.go')
| -rw-r--r-- | weed/filer2/filer.go | 150 |
1 files changed, 103 insertions, 47 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 0592c7848..848e87b2d 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -3,9 +3,11 @@ package filer2 import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2/embedded" "github.com/karlseguin/ccache" "strings" + "path/filepath" + "time" + "os" ) type Filer struct { @@ -21,78 +23,132 @@ func NewFiler(master string) *Filer { } } -func NewEmbeddedFiler(master string, dir string) (*Filer, error) { - _, err := embedded.NewEmbeddedStore(dir) - if err != nil { - return nil, fmt.Errorf("failed to create embedded filer store: %v", err) - } - return &Filer{ - master: master, - // store: store, - }, nil +func (f *Filer) SetStore(store FilerStore) () { + f.store = store } -func (f *Filer) CreateEntry(entry Entry) (error) { - /* - 1. recursively ensure parent directory is created. - 2. get current parent directory, add link, - 3. add the file entry - */ +func (f *Filer) DisableDirectoryCache() () { + f.directoryCache = nil +} - recursivelyEnsureDirectory(entry.Dir, func(parent, name string) error { - return nil - }) +func (f *Filer) CreateEntry(entry *Entry) (error) { + + dirParts := strings.Split(string(entry.FullPath), "/") + + // fmt.Printf("directory parts: %+v\n", dirParts) + + var lastDirectoryEntry *Entry + + for i := 1; i < len(dirParts); i++ { + dirPath := "/" + filepath.Join(dirParts[:i]...) + // fmt.Printf("%d directory: %+v\n", i, dirPath) + + dirFound := false + + // first check local cache + dirEntry := f.cacheGetDirectory(dirPath) + + // not found, check the store directly + if dirEntry == nil { + var dirFindErr error + dirFound, dirEntry, dirFindErr = f.FindEntry(FullPath(dirPath)) + if dirFindErr != nil { + return fmt.Errorf("findDirectory %s: %v", dirPath, dirFindErr) + } + } + + // no such existing directory + if !dirFound { + + // create the directory + now := time.Now() + + dirEntry = &Entry{ + FullPath: FullPath(dirPath), + Attr: Attr{ + Mtime: now, + Crtime: now, + Mode: os.ModeDir | 0660, + Uid: entry.Uid, + Gid: entry.Gid, + Nlink: 2, + }, + } + + mkdirErr := f.store.InsertEntry(dirEntry) + if mkdirErr != nil { + return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) + } + } + + // cache the directory entry + f.cacheSetDirectory(dirPath, dirEntry, i) + + // remember the direct parent directory entry + if i == len(dirParts)-1 { + lastDirectoryEntry = dirEntry + } + + } - return f.store.CreateEntry(entry) + if lastDirectoryEntry == nil { + return fmt.Errorf("parent folder not found: %v", entry.FullPath) + } + + if !hasWritePermission(lastDirectoryEntry, entry) { + return fmt.Errorf("no write permission in folder %v", lastDirectoryEntry.FullPath) + } + + if err := f.store.InsertEntry(entry); err != nil { + return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) + } + if err := f.store.AddDirectoryLink(lastDirectoryEntry, 1); err != nil { + return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) + } + + return nil } func (f *Filer) AppendFileChunk(p FullPath, c FileChunk) (err error) { return f.store.AppendFileChunk(p, c) } -func (f *Filer) FindEntry(p FullPath) (found bool, fileEntry Entry, err error) { +func (f *Filer) FindEntry(p FullPath) (found bool, entry *Entry, err error) { return f.store.FindEntry(p) } -func (f *Filer) DeleteEntry(p FullPath) (fileEntry Entry, err error) { +func (f *Filer) DeleteEntry(p FullPath) (fileEntry *Entry, err error) { return f.store.DeleteEntry(p) } -func (f *Filer) ListDirectoryEntries(p FullPath) ([]Entry, error) { +func (f *Filer) ListDirectoryEntries(p FullPath) ([]*Entry, error) { + if strings.HasSuffix(string(p), "/") { + p = p[0:len(p)-1] + } return f.store.ListDirectoryEntries(p) } -func (f *Filer) UpdateEntry(entry Entry) (error) { - return f.store.UpdateEntry(entry) -} - -func recursivelyEnsureDirectory(fullPath string, fn func(parent, name string) error) (error) { - if strings.HasSuffix(fullPath, "/") { - fullPath = fullPath[0:len(fullPath)-1] +func (f *Filer) cacheGetDirectory(dirpath string) (*Entry) { + if f.directoryCache == nil { + return nil } - nextPathEnd := strings.LastIndex(fullPath, "/") - if nextPathEnd < 0 { + item := f.directoryCache.Get(dirpath) + if item == nil { return nil } + return item.Value().(*Entry) +} - dirName := fullPath[nextPathEnd+1:] - parentDirPath := fullPath[0:nextPathEnd] - - if parentDirPath == "" { - parentDirPath = "/" - } +func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { - if err := recursivelyEnsureDirectory(parentDirPath, fn); err != nil { - return err + if f.directoryCache == nil { + return } - if err := fn(parentDirPath, dirName); err != nil { - return err + minutes := 60 + if level < 10 { + minutes -= level * 6 } - return nil -} - -func (f *Filer) cacheGetDirectory(dirpath string) (error) { - return nil + f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute) } |
