diff options
Diffstat (limited to 'weed/command/filer_remote_sync.go')
| -rw-r--r-- | weed/command/filer_remote_sync.go | 38 |
1 files changed, 23 insertions, 15 deletions
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index c2f97cc6c..8d2719660 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" @@ -72,13 +73,6 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { dir := *remoteSyncOptions.dir filerAddress := *remoteSyncOptions.filerAddress - // read filer remote storage mount mappings - _, _, remoteStorageMountLocation, storageConf, detectErr := filer.DetectMountInfo(grpcDialOption, filerAddress, dir) - if detectErr != nil { - fmt.Printf("read mount info: %v", detectErr) - return false - } - filerSource := &source.FilerSource{} filerSource.DoInitialize( filerAddress, @@ -89,7 +83,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { fmt.Printf("synchronize %s to remote storage...\n", dir) util.RetryForever("filer.remote.sync "+dir, func() error { - return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) + return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) }, func(err error) bool { if err != nil { glog.Errorf("synchronize %s: %v", dir, err) @@ -100,7 +94,13 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { return true } -func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error { +func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { + + // read filer remote storage mount mappings + _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir) + if detectErr != nil { + return fmt.Errorf("read mount info: %v", detectErr) + } dirHash := util.HashStringToLong(mountedDir) @@ -115,11 +115,15 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour } lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) - if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { - lastOffsetTs = time.Unix(0, lastOffsetTsNs) - glog.V(0).Infof("resume from %v", lastOffsetTs) + if mountedDirEntry != nil { + if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { + lastOffsetTs = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resume from %v", lastOffsetTs) + } else { + lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + } } else { - lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + lastOffsetTs = time.Now() } } else { lastOffsetTs = time.Now().Add(-*option.timeAgo) @@ -160,6 +164,10 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour if message.OldEntry != nil && message.NewEntry == nil { glog.V(2).Infof("delete: %+v", resp) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + if message.OldEntry.IsDirectory { + glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) + return client.RemoveDirectory(dest) + } glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) return client.DeleteFile(dest) } @@ -206,10 +214,10 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } -func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation { +func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { source := string(sourcePath[len(mountDir):]) dest := util.FullPath(remoteMountLocation.Path).Child(source) - return &filer_pb.RemoteStorageLocation{ + return &remote_pb.RemoteStorageLocation{ Name: remoteMountLocation.Name, Bucket: remoteMountLocation.Bucket, Path: string(dest), |
