diff options
Diffstat (limited to 'weed/command/filer_remote_sync.go')
| -rw-r--r-- | weed/command/filer_remote_sync.go | 47 |
1 files changed, 42 insertions, 5 deletions
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 43dc86f33..b751d30b6 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -12,7 +12,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" "google.golang.org/grpc" + "os" + "strings" "time" ) @@ -102,8 +105,6 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return fmt.Errorf("read mount info: %v", detectErr) } - dirHash := util.HashStringToLong(mountedDir) - // 1. specified by timeAgo // 2. last offset timestamp for this directory // 3. directory creation time @@ -114,7 +115,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return fmt.Errorf("lookup %s: %v", mountedDir, err) } - lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) + lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) if mountedDirEntry != nil { if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { lastOffsetTs = time.Unix(0, lastOffsetTsNs) @@ -134,8 +135,44 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return err } + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + if remoteLoc, found := mappings.Mappings[mountedDir]; found { + if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { + glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) + } + } else { + glog.V(0).Infof("unmounted %s exiting ...", mountedDir) + os.Exit(0) + } + } + if message.NewEntry.Name == remoteStorage.Name + filer.REMOTE_STORAGE_CONF_SUFFIX { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + remoteStorage = conf + client, err = remote_storage.GetRemoteStorage(remoteStorage) + return err + } + + return nil + } + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + if message.OldEntry == nil && message.NewEntry == nil { return nil } @@ -207,11 +244,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { lastTime := time.Unix(0, lastTsNs) glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) - return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) + return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs) }) return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", - mountedDir, nil, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) + mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { |
