aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_remote_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_remote_sync.go')
-rw-r--r--weed/command/filer_remote_sync.go38
1 files changed, 23 insertions, 15 deletions
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index c2f97cc6c..8d2719660 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -72,13 +73,6 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
dir := *remoteSyncOptions.dir
filerAddress := *remoteSyncOptions.filerAddress
- // read filer remote storage mount mappings
- _, _, remoteStorageMountLocation, storageConf, detectErr := filer.DetectMountInfo(grpcDialOption, filerAddress, dir)
- if detectErr != nil {
- fmt.Printf("read mount info: %v", detectErr)
- return false
- }
-
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
filerAddress,
@@ -89,7 +83,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
fmt.Printf("synchronize %s to remote storage...\n", dir)
util.RetryForever("filer.remote.sync "+dir, func() error {
- return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
+ return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
}, func(err error) bool {
if err != nil {
glog.Errorf("synchronize %s: %v", dir, err)
@@ -100,7 +94,13 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
return true
}
-func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
+func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
+
+ // read filer remote storage mount mappings
+ _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
+ if detectErr != nil {
+ return fmt.Errorf("read mount info: %v", detectErr)
+ }
dirHash := util.HashStringToLong(mountedDir)
@@ -115,11 +115,15 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
}
lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
- if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
- lastOffsetTs = time.Unix(0, lastOffsetTsNs)
- glog.V(0).Infof("resume from %v", lastOffsetTs)
+ if mountedDirEntry != nil {
+ if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
+ lastOffsetTs = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resume from %v", lastOffsetTs)
+ } else {
+ lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ }
} else {
- lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ lastOffsetTs = time.Now()
}
} else {
lastOffsetTs = time.Now().Add(-*option.timeAgo)
@@ -160,6 +164,10 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
if message.OldEntry != nil && message.NewEntry == nil {
glog.V(2).Infof("delete: %+v", resp)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ if message.OldEntry.IsDirectory {
+ glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
+ return client.RemoveDirectory(dest)
+ }
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
return client.DeleteFile(dest)
}
@@ -206,10 +214,10 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
"filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
-func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
+func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
source := string(sourcePath[len(mountDir):])
dest := util.FullPath(remoteMountLocation.Path).Child(source)
- return &filer_pb.RemoteStorageLocation{
+ return &remote_pb.RemoteStorageLocation{
Name: remoteMountLocation.Name,
Bucket: remoteMountLocation.Bucket,
Path: string(dest),