aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_remote_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_remote_sync.go')
-rw-r--r--weed/command/filer_remote_sync.go47
1 files changed, 42 insertions, 5 deletions
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index 43dc86f33..b751d30b6 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -12,7 +12,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
+ "os"
+ "strings"
"time"
)
@@ -102,8 +105,6 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return fmt.Errorf("read mount info: %v", detectErr)
}
- dirHash := util.HashStringToLong(mountedDir)
-
// 1. specified by timeAgo
// 2. last offset timestamp for this directory
// 3. directory creation time
@@ -114,7 +115,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return fmt.Errorf("lookup %s: %v", mountedDir, err)
}
- lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
+ lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
if mountedDirEntry != nil {
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
@@ -134,8 +135,44 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return err
}
+ handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.NewEntry == nil {
+ return nil
+ }
+ if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
+ mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
+ if readErr != nil {
+ return fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+ if remoteLoc, found := mappings.Mappings[mountedDir]; found {
+ if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
+ glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
+ }
+ } else {
+ glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
+ os.Exit(0)
+ }
+ }
+ if message.NewEntry.Name == remoteStorage.Name + filer.REMOTE_STORAGE_CONF_SUFFIX {
+ conf := &remote_pb.RemoteConf{}
+ if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
+ }
+ remoteStorage = conf
+ client, err = remote_storage.GetRemoteStorage(remoteStorage)
+ return err
+ }
+
+ return nil
+ }
+
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
+ if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
+ return handleEtcRemoteChanges(resp)
+ }
+
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
@@ -207,11 +244,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
- return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
+ return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
})
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
- mountedDir, nil, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+ mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {