aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/filer_remote_sync.go54
1 files changed, 30 insertions, 24 deletions
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index fb88652c1..e7088fc6c 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -105,30 +105,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return fmt.Errorf("read mount info: %v", detectErr)
}
- // 1. specified by timeAgo
- // 2. last offset timestamp for this directory
- // 3. directory creation time
- var lastOffsetTs time.Time
- if *option.timeAgo == 0 {
- mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
- if err != nil {
- return fmt.Errorf("lookup %s: %v", mountedDir, err)
- }
-
- lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
- if mountedDirEntry != nil {
- if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
- lastOffsetTs = time.Unix(0, lastOffsetTsNs)
- glog.V(0).Infof("resume from %v", lastOffsetTs)
- } else {
- lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
- }
- } else {
- lastOffsetTs = time.Now()
- }
- } else {
- lastOffsetTs = time.Now().Add(-*option.timeAgo)
- }
+ lastOffsetTs := collectLastSyncOffset(option, mountedDir)
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
@@ -251,6 +228,35 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
+func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) (time.Time) {
+ // 1. specified by timeAgo
+ // 2. last offset timestamp for this directory
+ // 3. directory creation time
+ var lastOffsetTs time.Time
+ if *option.timeAgo == 0 {
+ mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ if err != nil {
+ glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
+ return time.Now()
+ }
+
+ lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
+ if mountedDirEntry != nil {
+ if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
+ lastOffsetTs = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resume from %v", lastOffsetTs)
+ } else {
+ lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ }
+ } else {
+ lastOffsetTs = time.Now()
+ }
+ } else {
+ lastOffsetTs = time.Now().Add(-*option.timeAgo)
+ }
+ return lastOffsetTs
+}
+
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
source := string(sourcePath[len(mountDir):])
dest := util.FullPath(remoteMountLocation.Path).Child(source)