diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-10-17 04:22:42 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-10-17 04:22:42 -0700 |
| commit | cca62fdb30f207c2a0c703412b64507042a0aadf (patch) | |
| tree | 76a205156fe9a964bfe313fd244e419cbc571181 /weed/filesys | |
| parent | 004e56c1a6b0f0efb022ec07e93b14e9d7baaef9 (diff) | |
| download | seaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.tar.xz seaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.zip | |
mount: streaming renaming folders
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir_rename.go | 117 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 4 |
2 files changed, 39 insertions, 82 deletions
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index dd76577b0..d2288e3bd 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,12 +2,10 @@ package filesys import ( "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/filer" - "math" - "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" + "io" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -23,19 +21,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) - // find local old entry - oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath) - if err != nil { - glog.Errorf("dir Rename can not find source %s : %v", oldPath, err) - return fuse.ENOENT - } - // update remote filer - err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - request := &filer_pb.AtomicRenameEntryRequest{ + request := &filer_pb.StreamRenameEntryRequest{ OldDirectory: dir.FullPath(), OldName: req.OldName, NewDirectory: newDir.FullPath(), @@ -43,12 +34,28 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector Signatures: []int32{dir.wfs.signature}, } - _, err := client.AtomicRenameEntry(ctx, request) + stream, err := client.StreamRenameEntry(ctx, request) if err != nil { glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err) return fuse.EXDEV } + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + if err = dir.handleRenameResponse(ctx, resp); err != nil { + return err + } + + } + return nil }) @@ -57,23 +64,25 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return fuse.EIO } - err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName) - if err != nil { - glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - return nil } -func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { - oldName := entry.Name() +func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error { + // comes from filer StreamRenameEntry, can only be create or delete entry + + if resp.EventNotification.NewEntry != nil { + // with new entry, the old entry name also exists. This is the first step to create new entry + newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry) + if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil { + return err + } - oldPath := oldParent.Child(oldName) - newPath := newParent.Child(newName) - if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { + oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath) + oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name + oldPath := oldParent.Child(oldName) + newPath := newParent.Child(newName) oldFsNode := NodeWithId(oldPath.AsInode()) newFsNode := NodeWithId(newPath.AsInode()) newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode())) @@ -110,65 +119,13 @@ func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *f } dir.wfs.handlesLock.Unlock() - if entry.IsDirectory() { - if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil { - return err - } + }else if resp.EventNotification.OldEntry != nil { + // without new entry, only old entry name exists. This is the second step to delete old entry + if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil { + return err } - return nil - }); err != nil { - return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err) } return nil -} - -func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error { - currentDirPath := oldParent.Child(oldName) - newDirPath := newParent.Child(newName) - - glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath) - - var moveErr error - listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool { - moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) - if moveErr != nil { - return false - } - return true - }) - if listErr != nil { - return listErr - } - if moveErr != nil { - return moveErr - } - - return nil -} - -func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { - - newPath := newParent.Child(newName) - oldPath := oldParent.Child(entry.Name()) - - entry.FullPath = newPath - if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil { - glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - if moveFolderSubEntries != nil { - if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { - return moveChildrenErr - } - } - - if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil { - glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - return nil } diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 69d1655ee..23384f592 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -2,7 +2,6 @@ package meta_cache import ( "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" @@ -122,7 +121,8 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full //defer mc.RUnlock() if !mc.visitedBoundary.HasVisited(dirPath) { - return fmt.Errorf("unsynchronized dir: %v", dirPath) + // if this request comes after renaming, it should be fine + glog.Warningf("unsynchronized dir: %v", dirPath) } _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { |
