diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-08-15 01:53:46 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-08-15 01:53:46 -0700 |
| commit | 9462f5129a8c6c66200150b41ddfe96a3dd0454e (patch) | |
| tree | 76c1782fe91d0d6adcb63a6435d217d48fd85f78 /weed/shell/command_remote_meta_sync.go | |
| parent | 3ada61c875158dedd24618013da01939cc7acc07 (diff) | |
| download | seaweedfs-9462f5129a8c6c66200150b41ddfe96a3dd0454e.tar.xz seaweedfs-9462f5129a8c6c66200150b41ddfe96a3dd0454e.zip | |
shell: add "remote.meta.sync"
Diffstat (limited to 'weed/shell/command_remote_meta_sync.go')
| -rw-r--r-- | weed/shell/command_remote_meta_sync.go | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go new file mode 100644 index 000000000..7e111143c --- /dev/null +++ b/weed/shell/command_remote_meta_sync.go @@ -0,0 +1,208 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "strings" +) + +func init() { + Commands = append(Commands, &commandRemoteMetaSync{}) +} + +type commandRemoteMetaSync struct { +} + +func (c *commandRemoteMetaSync) Name() string { + return "remote.meta.sync" +} + +func (c *commandRemoteMetaSync) Help() string { + return `synchronize the local file meta data with the remote file metadata + + # assume a remote storage is configured to name "cloud1" + remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + # mount and pull one bucket + remote.mount -dir=/xxx -remote=cloud1/bucket + + After mount, if the remote file can be changed, + run this command to synchronize the metadata of the mounted folder or any sub folder + + remote.meta.sync -dir=/xxx + remote.meta.sync -dir=/xxx/some/subdir + + This is designed to run regularly. So you can add it to some cronjob. + + If there are no other operations changing remote files, this operation is not needed. + +` +} + +func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMetaSyncCommand.String("dir", "", "a directory in filer") + + if err = remoteMetaSyncCommand.Parse(args); err != nil { + return nil + } + + localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) + if detectErr != nil{ + return detectErr + } + + // pull metadata from remote + if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil { + return fmt.Errorf("cache content data: %v", err) + } + + return nil +} + +func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) { + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return "", nil, nil, listErr + } + if dir == "" { + jsonPrintln(writer, mappings) + return "", nil, nil, fmt.Errorf("need to specify '-dir' option") + } + + var localMountedDir string + var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + for k, loc := range mappings.Mappings { + if strings.HasPrefix(dir, k) { + localMountedDir, remoteStorageMountedLocation = k, loc + } + } + if localMountedDir == "" { + jsonPrintln(writer, mappings) + return "", nil, nil, fmt.Errorf("%s is not mounted", dir) + } + + // find remote storage configuration + remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name) + if err != nil { + return "", nil, nil, err + } + + return localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil +} + +/* + This function update entry.RemoteEntry if the remote has any changes. + + To pull remote updates, or created for the first time, the criteria is: + entry == nil or (entry.RemoteEntry != nil and entry.RemoteEntry.RemoteTag != remote.RemoteTag) + After the meta pull, the entry.RemoteEntry will have: + remoteEntry.LastLocalSyncTsNs == 0 + Attributes.FileSize = uint64(remoteEntry.RemoteSize) + Attributes.Mtime = remoteEntry.RemoteMtime + remoteEntry.RemoteTag = actual remote tag + chunks = nil + + When reading the file content or pulling the file content in "remote.cache", the criteria is: + Attributes.FileSize > 0 and len(chunks) == 0 + After caching the file content, the entry.RemoteEntry will be + remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano() + Attributes.FileSize = uint64(remoteEntry.RemoteSize) + Attributes.Mtime = remoteEntry.RemoteMtime + chunks = non-emtpy + + When "weed filer.remote.sync" to upload local changes to remote, the criteria is: + Attributes.Mtime > remoteEntry.RemoteMtime + Right after "weed filer.remote.sync", the entry.RemoteEntry will be + remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano() + remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize + remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime + remoteEntry.RemoteTag = actual remote tag + + + If entry does not exists, need to pull meta + If entry.RemoteEntry == nil, this is a new local change and should not be overwritten + If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag { + the remote version is updated, need to pull meta + } + */ +func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error { + + // visit remote storage + remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return err + } + + remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache) + println("local :", localMountedDir) + println("remote:", remoteMountedLocation.Path) + println("local+:", dirToCache) + println("remote+:", remote.Path) + + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + ctx := context.Background() + err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { + localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir) + fmt.Fprint(writer, localDir.Child(name)) + + lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: string(localDir), + Name: name, + }) + var existingEntry *filer_pb.Entry + if lookupErr != nil { + if lookupErr != filer_pb.ErrNotFound { + return lookupErr + } + } else { + existingEntry = lookupResponse.Entry + } + + if existingEntry == nil { + _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: string(localDir), + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: isDirectory, + Attributes: &filer_pb.FuseAttributes{ + FileSize: uint64(remoteEntry.RemoteSize), + Mtime: remoteEntry.RemoteMtime, + FileMode: uint32(0644), + }, + RemoteEntry: remoteEntry, + }, + }) + fmt.Fprintln(writer, " (create)") + return createErr + } else { + if existingEntry.RemoteEntry == nil { + // this is a new local change and should not be overwritten + fmt.Fprintln(writer, " (skip)") + return nil + } + if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag { + // the remote version is updated, need to pull meta + fmt.Fprintln(writer, " (update)") + return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry) + } + } + fmt.Fprintln(writer, " (skip)") + return nil + }) + return err + }) + + if err != nil { + return err + } + + return nil +} |
