diff options
Diffstat (limited to 'weed/filer/filer.go')
| -rw-r--r-- | weed/filer/filer.go | 147 |
1 files changed, 73 insertions, 74 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 13dedea1e..e59887763 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return nil } - dirParts := strings.Split(string(entry.FullPath), "/") - - // fmt.Printf("directory parts: %+v\n", dirParts) - - var lastDirectoryEntry *Entry - - for i := 1; i < len(dirParts); i++ { - dirPath := "/" + util.Join(dirParts[:i]...) - // fmt.Printf("%d directory: %+v\n", i, dirPath) - - // check the store directly - glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) - - // no such existing directory - if dirEntry == nil { - - // create the directory - now := time.Now() - - dirEntry = &Entry{ - FullPath: util.FullPath(dirPath), - Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | entry.Mode | 0110, - Uid: entry.Uid, - Gid: entry.Gid, - Collection: entry.Collection, - Replication: entry.Replication, - UserName: entry.UserName, - GroupNames: entry.GroupNames, - }, - } - - glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) - mkdirErr := f.Store.InsertEntry(ctx, dirEntry) - if mkdirErr != nil { - if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { - glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) - return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) - } - } else { - f.maybeAddBucket(dirEntry) - f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) - } - - } else if !dirEntry.IsDirectory() { - glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) - return fmt.Errorf("%s is a file", dirPath) - } - - // remember the direct parent directory entry - if i == len(dirParts)-1 { - lastDirectoryEntry = dirEntry - } - - } - - if lastDirectoryEntry == nil { - glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath) - return fmt.Errorf("parent folder not found: %v", entry.FullPath) - } + oldEntry, _ := f.FindEntry(ctx, entry.FullPath) /* if !hasWritePermission(lastDirectoryEntry, entry) { @@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } */ - oldEntry, _ := f.FindEntry(ctx, entry.FullPath) - if oldEntry == nil { + + dirParts := strings.Split(string(entry.FullPath), "/") + if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { + return err + } + glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name()) if err := f.Store.InsertEntry(ctx, entry); err != nil { glog.Errorf("insert entry %s: %v", entry.FullPath, err) @@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return nil } +func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) { + + if level == 0 { + return nil + } + + dirPath := "/" + util.Join(dirParts[:level]...) + // fmt.Printf("%d directory: %+v\n", i, dirPath) + + // check the store directly + glog.V(4).Infof("find uncached directory: %s", dirPath) + dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) + + // no such existing directory + if dirEntry == nil { + + // ensure parent directory + if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil { + return err + } + + // create the directory + now := time.Now() + + dirEntry = &Entry{ + FullPath: util.FullPath(dirPath), + Attr: Attr{ + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0110, + Uid: entry.Uid, + Gid: entry.Gid, + Collection: entry.Collection, + Replication: entry.Replication, + UserName: entry.UserName, + GroupNames: entry.GroupNames, + }, + } + + glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) + mkdirErr := f.Store.InsertEntry(ctx, dirEntry) + if mkdirErr != nil { + if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { + glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) + return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) + } + } else { + f.maybeAddBucket(dirEntry) + f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) + } + + } else if !dirEntry.IsDirectory() { + glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) + return fmt.Errorf("%s is a file", dirPath) + } + + return nil +} + func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { if oldEntry != nil { entry.Attr.Crtime = oldEntry.Attr.Crtime @@ -280,21 +281,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e } -func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) { - listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix) - if listErr != nil { - return listedEntries, expiredCount, "", listErr - } - for _, entry := range listedEntries { - lastFileName = entry.Name() +func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) { + lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { if entry.TtlSec > 0 { if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { f.Store.DeleteOneEntry(ctx, entry) expiredCount++ - continue + return true } } - entries = append(entries, entry) + return eachEntryFunc(entry) + }) + if err != nil { + return expiredCount, lastFileName, err } return } |
