aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_rename.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-17 04:22:42 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-17 04:22:42 -0700
commitcca62fdb30f207c2a0c703412b64507042a0aadf (patch)
tree76a205156fe9a964bfe313fd244e419cbc571181 /weed/server/filer_grpc_server_rename.go
parent004e56c1a6b0f0efb022ec07e93b14e9d7baaef9 (diff)
downloadseaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.tar.xz
seaweedfs-cca62fdb30f207c2a0c703412b64507042a0aadf.zip
mount: streaming renaming folders
Diffstat (limited to 'weed/server/filer_grpc_server_rename.go')
-rw-r--r--weed/server/filer_grpc_server_rename.go89
1 files changed, 82 insertions, 7 deletions
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 8a11c91e3..278ecadbd 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"path/filepath"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -33,7 +34,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
}
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
+ moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
@@ -47,11 +48,49 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
+func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, stream filer_pb.SeaweedFiler_StreamRenameEntryServer) (err error) {
- if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
+ glog.V(1).Infof("StreamRenameEntry %v", req)
+
+ oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
+ newParent := util.FullPath(filepath.ToSlash(req.NewDirectory))
+
+ if err := fs.filer.CanRename(oldParent, newParent); err != nil {
+ return err
+ }
+
+ ctx := context.Background()
+
+ ctx, err = fs.filer.BeginTransaction(ctx)
+ if err != nil {
+ return err
+ }
+
+ oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
+ if err != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
+ }
+
+ moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
+ if moveErr != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
+ } else {
+ if commitError := fs.filer.CommitTransaction(ctx); commitError != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError)
+ }
+ }
+
+ return nil
+}
+
+func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
+
+ if err := fs.moveSelfEntry(ctx, stream, oldParent, entry, newParent, newName, func() error {
if entry.IsDirectory() {
- if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil {
+ if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures); err != nil {
return err
}
}
@@ -63,7 +102,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -84,7 +123,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
for _, item := range entries {
lastFileName = item.Name()
// println("processing", lastFileName)
- err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures)
+ err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures)
if err != nil {
return err
}
@@ -96,7 +135,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error {
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -118,6 +157,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil {
return createErr
}
+ if stream != nil {
+ if err := stream.Send(&filer_pb.StreamRenameEntryResponse{
+ Directory: string(newParent),
+ EventNotification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{
+ Name: entry.Name(),
+ },
+ NewEntry: newEntry.ToProtoEntry(),
+ DeleteChunks: false,
+ NewParentPath: string(newParent),
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ },
+ TsNs: time.Now().UnixNano(),
+ }); err != nil {
+ return err
+ }
+ }
if moveFolderSubEntries != nil {
if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
@@ -130,6 +187,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
if deleteErr != nil {
return deleteErr
}
+ if stream != nil {
+ if err := stream.Send(&filer_pb.StreamRenameEntryResponse{
+ Directory: string(oldParent),
+ EventNotification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{
+ Name: entry.Name(),
+ },
+ NewEntry: nil,
+ DeleteChunks: false,
+ NewParentPath: "",
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ },
+ TsNs: time.Now().UnixNano(),
+ }); err != nil {
+ return err
+ }
+ }
return nil