diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-04 04:35:46 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-04 04:35:46 -0700 |
| commit | 4b28c5f6c33f0dd0eed07ad24936b30f76b8d449 (patch) | |
| tree | ec0f4e82c85674cf75b7e3ea4a712febbd3a3adb /weed/command/filer_remote_sync_dir.go | |
| parent | da49d25950095d9517a767a9d8fe9628ef4c4e80 (diff) | |
| download | seaweedfs-4b28c5f6c33f0dd0eed07ad24936b30f76b8d449.tar.xz seaweedfs-4b28c5f6c33f0dd0eed07ad24936b30f76b8d449.zip | |
filer.remote.sync: split into buckets mode and single directory mode
Diffstat (limited to 'weed/command/filer_remote_sync_dir.go')
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go new file mode 100644 index 000000000..65edb76db --- /dev/null +++ b/weed/command/filer_remote_sync_dir.go @@ -0,0 +1,220 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "os" + "strings" + "time" +) + +func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { + + // read filer remote storage mount mappings + _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir) + if detectErr != nil { + return fmt.Errorf("read mount info: %v", detectErr) + } + + eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource) + if err != nil { + return err + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs) + }) + + lastOffsetTs := collectLastSyncOffset(option, mountedDir) + + return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", + mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) +} + +func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return nil, err + } + + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + if remoteLoc, found := mappings.Mappings[mountedDir]; found { + if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { + glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) + } + } else { + glog.V(0).Infof("unmounted %s exiting ...", mountedDir) + os.Exit(0) + } + } + if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + remoteStorage = conf + if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil { + client = newClient + } else { + return err + } + } + + return nil + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + if !filer.HasData(message.NewEntry) { + return nil + } + glog.V(2).Infof("create: %+v", resp) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping creating: %+v", resp) + return nil + } + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) + return client.WriteDirectory(dest, message.NewEntry) + } + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + reader := filer.NewFileReader(filerSource, message.NewEntry) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if message.OldEntry != nil && message.NewEntry == nil { + glog.V(2).Infof("delete: %+v", resp) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + if message.OldEntry.IsDirectory { + glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) + return client.RemoveDirectory(dest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + if message.NewEntry.IsDirectory { + return client.WriteDirectory(dest, message.NewEntry) + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + if filer.IsSameData(message.OldEntry, message.NewEntry) { + glog.V(2).Infof("update meta: %+v", resp) + return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) + } + } + glog.V(2).Infof("update: %+v", resp) + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + reader := filer.NewFileReader(filerSource, message.NewEntry) + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + + return nil + } + return eachEntryFunc, nil +} + +func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time { + // 1. specified by timeAgo + // 2. last offset timestamp for this directory + // 3. directory creation time + var lastOffsetTs time.Time + if *option.timeAgo == 0 { + mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) + if err != nil { + glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) + return time.Now() + } + + lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) + if mountedDirEntry != nil { + if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { + lastOffsetTs = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resume from %v", lastOffsetTs) + } else { + lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + } + } else { + lastOffsetTs = time.Now() + } + } else { + lastOffsetTs = time.Now().Add(-*option.timeAgo) + } + return lastOffsetTs +} + +func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { + source := string(sourcePath[len(mountDir):]) + dest := util.FullPath(remoteMountLocation.Path).Child(source) + return &remote_pb.RemoteStorageLocation{ + Name: remoteMountLocation.Name, + Bucket: remoteMountLocation.Bucket, + Path: string(dest), + } +} + +func shouldSendToRemote(entry *filer_pb.Entry) bool { + if entry.RemoteEntry == nil { + return true + } + if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime { + return true + } + return false +} + +func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + entry.RemoteEntry = remoteEntry + return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err + }) +} |
