From cca62fdb30f207c2a0c703412b64507042a0aadf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 17 Oct 2021 04:22:42 -0700 Subject: mount: streaming renaming folders --- weed/filesys/dir_rename.go | 117 ++++++++++++++------------------------------- 1 file changed, 37 insertions(+), 80 deletions(-) (limited to 'weed/filesys/dir_rename.go') diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index dd76577b0..d2288e3bd 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,12 +2,10 @@ package filesys import ( "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/filer" - "math" - "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" + "io" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -23,19 +21,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) - // find local old entry - oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath) - if err != nil { - glog.Errorf("dir Rename can not find source %s : %v", oldPath, err) - return fuse.ENOENT - } - // update remote filer - err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - request := &filer_pb.AtomicRenameEntryRequest{ + request := &filer_pb.StreamRenameEntryRequest{ OldDirectory: dir.FullPath(), OldName: req.OldName, NewDirectory: newDir.FullPath(), @@ -43,12 +34,28 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector Signatures: []int32{dir.wfs.signature}, } - _, err := client.AtomicRenameEntry(ctx, request) + stream, err := client.StreamRenameEntry(ctx, request) if err != nil { glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err) return fuse.EXDEV } + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + if err = dir.handleRenameResponse(ctx, resp); err != nil { + return err + } + + } + return nil }) @@ -57,23 +64,25 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return fuse.EIO } - err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName) - if err != nil { - glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - return nil } -func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { - oldName := entry.Name() +func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error { + // comes from filer StreamRenameEntry, can only be create or delete entry + + if resp.EventNotification.NewEntry != nil { + // with new entry, the old entry name also exists. This is the first step to create new entry + newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry) + if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil { + return err + } - oldPath := oldParent.Child(oldName) - newPath := newParent.Child(newName) - if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { + oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath) + oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name + oldPath := oldParent.Child(oldName) + newPath := newParent.Child(newName) oldFsNode := NodeWithId(oldPath.AsInode()) newFsNode := NodeWithId(newPath.AsInode()) newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode())) @@ -110,65 +119,13 @@ func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *f } dir.wfs.handlesLock.Unlock() - if entry.IsDirectory() { - if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil { - return err - } + }else if resp.EventNotification.OldEntry != nil { + // without new entry, only old entry name exists. This is the second step to delete old entry + if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil { + return err } - return nil - }); err != nil { - return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err) } return nil -} - -func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error { - currentDirPath := oldParent.Child(oldName) - newDirPath := newParent.Child(newName) - - glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath) - - var moveErr error - listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool { - moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) - if moveErr != nil { - return false - } - return true - }) - if listErr != nil { - return listErr - } - if moveErr != nil { - return moveErr - } - - return nil -} - -func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { - - newPath := newParent.Child(newName) - oldPath := oldParent.Child(entry.Name()) - - entry.FullPath = newPath - if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil { - glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - if moveFolderSubEntries != nil { - if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { - return moveChildrenErr - } - } - - if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil { - glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - - return nil } -- cgit v1.2.3