aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-17 04:22:42 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-17 04:22:42 -0700
commitcca62fdb30f207c2a0c703412b64507042a0aadf (patch)
tree76a205156fe9a964bfe313fd244e419cbc571181 /weed/filesys
parent004e56c1a6b0f0efb022ec07e93b14e9d7baaef9 (diff)
downloadseaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.tar.xz
seaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.zip
mount: streaming renaming folders
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir_rename.go117
-rw-r--r--weed/filesys/meta_cache/meta_cache.go4
2 files changed, 39 insertions, 82 deletions
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index dd76577b0..d2288e3bd 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -2,12 +2,10 @@ package filesys
import (
"context"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
- "math"
-
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+ "io"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -23,19 +21,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
- // find local old entry
- oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath)
- if err != nil {
- glog.Errorf("dir Rename can not find source %s : %v", oldPath, err)
- return fuse.ENOENT
- }
-
// update remote filer
- err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- request := &filer_pb.AtomicRenameEntryRequest{
+ request := &filer_pb.StreamRenameEntryRequest{
OldDirectory: dir.FullPath(),
OldName: req.OldName,
NewDirectory: newDir.FullPath(),
@@ -43,12 +34,28 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
Signatures: []int32{dir.wfs.signature},
}
- _, err := client.AtomicRenameEntry(ctx, request)
+ stream, err := client.StreamRenameEntry(ctx, request)
if err != nil {
glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
return fuse.EXDEV
}
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ if err = dir.handleRenameResponse(ctx, resp); err != nil {
+ return err
+ }
+
+ }
+
return nil
})
@@ -57,23 +64,25 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
return fuse.EIO
}
- err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName)
- if err != nil {
- glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
-
return nil
}
-func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
- oldName := entry.Name()
+func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error {
+ // comes from filer StreamRenameEntry, can only be create or delete entry
+
+ if resp.EventNotification.NewEntry != nil {
+ // with new entry, the old entry name also exists. This is the first step to create new entry
+ newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
+ if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
+ return err
+ }
- oldPath := oldParent.Child(oldName)
- newPath := newParent.Child(newName)
- if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
+ oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath)
+ oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name
+ oldPath := oldParent.Child(oldName)
+ newPath := newParent.Child(newName)
oldFsNode := NodeWithId(oldPath.AsInode())
newFsNode := NodeWithId(newPath.AsInode())
newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode()))
@@ -110,65 +119,13 @@ func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *f
}
dir.wfs.handlesLock.Unlock()
- if entry.IsDirectory() {
- if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil {
- return err
- }
+ }else if resp.EventNotification.OldEntry != nil {
+ // without new entry, only old entry name exists. This is the second step to delete old entry
+ if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
+ return err
}
- return nil
- }); err != nil {
- return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err)
}
return nil
-}
-
-func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error {
- currentDirPath := oldParent.Child(oldName)
- newDirPath := newParent.Child(newName)
-
- glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
-
- var moveErr error
- listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool {
- moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
- if moveErr != nil {
- return false
- }
- return true
- })
- if listErr != nil {
- return listErr
- }
- if moveErr != nil {
- return moveErr
- }
-
- return nil
-}
-
-func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
-
- newPath := newParent.Child(newName)
- oldPath := oldParent.Child(entry.Name())
-
- entry.FullPath = newPath
- if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil {
- glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
-
- if moveFolderSubEntries != nil {
- if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
- return moveChildrenErr
- }
- }
-
- if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil {
- glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
-
- return nil
}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 69d1655ee..23384f592 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -2,7 +2,6 @@ package meta_cache
import (
"context"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -122,7 +121,8 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
//defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
- return fmt.Errorf("unsynchronized dir: %v", dirPath)
+ // if this request comes after renaming, it should be fine
+ glog.Warningf("unsynchronized dir: %v", dirPath)
}
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {