From 97406333a5ecc5b0d2cdaa74ff9901e3100e4bf2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 30 Mar 2019 23:08:29 -0700 Subject: support atomic renaming for mysql/postgres filer store --- weed/filesys/dir_rename.go | 106 +++++---------------------------------------- 1 file changed, 10 insertions(+), 96 deletions(-) (limited to 'weed/filesys/dir_rename.go') diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 8c586eb73..7a415ff82 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,13 +2,10 @@ package filesys import ( "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" - "math" - "path/filepath" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { @@ -17,103 +14,20 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return dir.wfs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - // find existing entry - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.OldName, + request := &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: dir.Path, + OldName: req.OldName, + NewDirectory: newDir.Path, + NewName: req.NewName, } - glog.V(4).Infof("find existing directory entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) + _, err := client.AtomicRenameEntry(ctx, request) if err != nil { - glog.V(3).Infof("renaming find %s/%s: %v", dir.Path, req.OldName, err) - return fuse.ENOENT + return fmt.Errorf("renaming %s/%s => %s/%s: %v", dir.Path, req.OldName, newDir.Path, req.NewName, err) } - entry := resp.Entry + return nil - glog.V(4).Infof("found existing directory entry resp: %+v", resp) - - return moveEntry(ctx, client, dir.Path, entry, newDir.Path, req.NewName) }) } - -func moveEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, oldParent string, entry *filer_pb.Entry, newParent, newName string) error { - if entry.IsDirectory { - currentDirPath := filepath.ToSlash(filepath.Join(oldParent, entry.Name)) - - lastFileName := "" - includeLastFile := false - limit := math.MaxInt32 - for limit > 0 { - request := &filer_pb.ListEntriesRequest{ - Directory: currentDirPath, - StartFromFileName: lastFileName, - InclusiveStartFrom: includeLastFile, - Limit: 1024, - } - glog.V(4).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - glog.V(0).Infof("list %s: %v", oldParent, err) - return fuse.EIO - } - if len(resp.Entries) == 0 { - break - } - - for _, item := range resp.Entries { - lastFileName = item.Name - err := moveEntry(ctx, client, currentDirPath, item, filepath.ToSlash(filepath.Join(newParent, newName)), item.Name) - if err != nil { - return err - } - limit-- - } - if len(resp.Entries) < 1024 { - break - } - } - - } - - // add to new directory - { - request := &filer_pb.CreateEntryRequest{ - Directory: newParent, - Entry: &filer_pb.Entry{ - Name: newName, - IsDirectory: entry.IsDirectory, - Attributes: entry.Attributes, - Chunks: entry.Chunks, - }, - } - - glog.V(1).Infof("create new entry: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("renaming create %s/%s: %v", newParent, newName, err) - return fuse.EIO - } - } - - // delete old entry - { - request := &filer_pb.DeleteEntryRequest{ - Directory: oldParent, - Name: entry.Name, - IsDeleteData: false, - } - - glog.V(1).Infof("remove old entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(0).Infof("renaming delete %s/%s: %v", oldParent, entry.Name, err) - return fuse.EIO - } - - } - - return nil - -} -- cgit v1.2.3