diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-10-17 04:22:42 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-10-17 04:22:42 -0700 |
| commit | cca62fdb30f207c2a0c703412b64507042a0aadf (patch) | |
| tree | 76a205156fe9a964bfe313fd244e419cbc571181 /weed/server/filer_grpc_server_rename.go | |
| parent | 004e56c1a6b0f0efb022ec07e93b14e9d7baaef9 (diff) | |
| download | seaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.tar.xz seaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.zip | |
mount: streaming renaming folders
Diffstat (limited to 'weed/server/filer_grpc_server_rename.go')
| -rw-r--r-- | weed/server/filer_grpc_server_rename.go | 89 |
1 files changed, 82 insertions, 7 deletions
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 8a11c91e3..278ecadbd 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -33,7 +34,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) } - moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures) + moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -47,11 +48,49 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { +func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, stream filer_pb.SeaweedFiler_StreamRenameEntryServer) (err error) { - if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { + glog.V(1).Infof("StreamRenameEntry %v", req) + + oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory)) + newParent := util.FullPath(filepath.ToSlash(req.NewDirectory)) + + if err := fs.filer.CanRename(oldParent, newParent); err != nil { + return err + } + + ctx := context.Background() + + ctx, err = fs.filer.BeginTransaction(ctx) + if err != nil { + return err + } + + oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName)) + if err != nil { + fs.filer.RollbackTransaction(ctx) + return fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) + } + + moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures) + if moveErr != nil { + fs.filer.RollbackTransaction(ctx) + return fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) + } else { + if commitError := fs.filer.CommitTransaction(ctx); commitError != nil { + fs.filer.RollbackTransaction(ctx) + return fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError) + } + } + + return nil +} + +func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { + + if err := fs.moveSelfEntry(ctx, stream, oldParent, entry, newParent, newName, func() error { if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil { + if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures); err != nil { return err } } @@ -63,7 +102,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -84,7 +123,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. for _, item := range entries { lastFileName = item.Name() // println("processing", lastFileName) - err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures) + err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures) if err != nil { return err } @@ -96,7 +135,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -118,6 +157,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil { return createErr } + if stream != nil { + if err := stream.Send(&filer_pb.StreamRenameEntryResponse{ + Directory: string(newParent), + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: entry.Name(), + }, + NewEntry: newEntry.ToProtoEntry(), + DeleteChunks: false, + NewParentPath: string(newParent), + IsFromOtherCluster: false, + Signatures: nil, + }, + TsNs: time.Now().UnixNano(), + }); err != nil { + return err + } + } if moveFolderSubEntries != nil { if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { @@ -130,6 +187,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat if deleteErr != nil { return deleteErr } + if stream != nil { + if err := stream.Send(&filer_pb.StreamRenameEntryResponse{ + Directory: string(oldParent), + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: entry.Name(), + }, + NewEntry: nil, + DeleteChunks: false, + NewParentPath: "", + IsFromOtherCluster: false, + Signatures: nil, + }, + TsNs: time.Now().UnixNano(), + }); err != nil { + return err + } + } return nil |
