diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-08-09 14:35:18 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-08-09 14:35:18 -0700 |
| commit | 713c035a6e5c71bbccdb0e1cc5856e1a84fbe122 (patch) | |
| tree | 6d175c8a35343607e7c53fc37fded82cac06af81 /weed/shell | |
| parent | 8cfd4876084432e393a30dadf0b8f466177183be (diff) | |
| download | seaweedfs-713c035a6e5c71bbccdb0e1cc5856e1a84fbe122.tar.xz seaweedfs-713c035a6e5c71bbccdb0e1cc5856e1a84fbe122.zip | |
shell: remote.cache remote.uncache
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_remote_cache.go | 151 | ||||
| -rw-r--r-- | weed/shell/command_remote_mount.go | 29 | ||||
| -rw-r--r-- | weed/shell/command_remote_uncache.go | 98 |
3 files changed, 267 insertions, 11 deletions
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go new file mode 100644 index 000000000..a9386ac76 --- /dev/null +++ b/weed/shell/command_remote_cache.go @@ -0,0 +1,151 @@ +package shell + +import ( + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "strings" +) + +func init() { + Commands = append(Commands, &commandRemoteCache{}) +} + +type commandRemoteCache struct { +} + +func (c *commandRemoteCache) Name() string { + return "remote.cache" +} + +func (c *commandRemoteCache) Help() string { + return `cache the file content for mounted directories or files + + # assume a remote storage is configured to name "s3_1" + remote.configure -name=s3_1 -type=s3 -access_key=xxx -secret_key=yyy + # mount and pull one bucket + remote.mount -dir=xxx -remote=s3_1/bucket + + # after mount, run one of these command to cache the content of the files + remote.cache -dir=xxx + remote.cache -dir=xxx/some/sub/dir + +` +} + +func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMountCommand.String("dir", "", "a directory in filer") + + if err = remoteMountCommand.Parse(args); err != nil { + return nil + } + + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return listErr + } + if *dir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintln(writer, "need to specify '-dir' option") + return nil + } + + var localMountedDir string + var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + for k, loc := range mappings.Mappings { + if strings.HasPrefix(*dir, k) { + localMountedDir, remoteStorageMountedLocation = k, loc + } + } + if localMountedDir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintf(writer, "%s is not mounted\n", *dir) + return nil + } + + // find remote storage configuration + remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name) + if err != nil { + return err + } + + // pull content from remote + if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil { + return fmt.Errorf("cache content data: %v", err) + } + + return nil +} + +func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) { + + err = filer_pb.ReadDirAllEntries(filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + if !visitEntry(dirPath, entry) { + return nil + } + subDir := dirPath.Child(entry.Name) + if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil { + return err + } + }else { + if !visitEntry(dirPath, entry) { + return nil + } + } + return nil + }) + return +} + +func shouldCacheToLocal(entry *filer_pb.Entry) bool { + if entry.IsDirectory { + return false + } + if entry.RemoteEntry == nil { + return false + } + if entry.RemoteEntry.LocalMtime == 0 && entry.RemoteEntry.RemoteSize > 0 { + return true + } + return false +} + +func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { + if entry.IsDirectory { + return false + } + if entry.RemoteEntry == nil { + return false + } + if entry.RemoteEntry.LocalMtime > 0 && len(entry.Chunks) > 0 { + return true + } + return false +} + +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error { + + return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + if !shouldCacheToLocal(entry) { + return true // true means recursive traversal should continue + } + + println(dir, entry.Name) + + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) + + if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { + fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) + return false + } + + return true + }) +} diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index f7869e221..a0d41ce6f 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -178,8 +178,8 @@ func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writ Name: name, IsDirectory: isDirectory, Attributes: &filer_pb.FuseAttributes{ - FileSize: uint64(remoteEntry.Size), - Mtime: remoteEntry.LastModifiedAt, + FileSize: uint64(remoteEntry.RemoteSize), + Mtime: remoteEntry.RemoteMtime, FileMode: uint32(0644), }, RemoteEntry: remoteEntry, @@ -187,15 +187,8 @@ func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writ }) return createErr } else { - if existingEntry.RemoteEntry == nil || existingEntry.RemoteEntry.ETag != remoteEntry.ETag { - existingEntry.RemoteEntry = remoteEntry - existingEntry.Attributes.FileSize = uint64(remoteEntry.Size) - existingEntry.Attributes.Mtime = remoteEntry.LastModifiedAt - _, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ - Directory: localDir, - Entry: existingEntry, - }) - return updateErr + if existingEntry.RemoteEntry == nil || existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag { + return doSaveRemoteEntry(client, localDir, existingEntry, remoteEntry) } } return nil @@ -240,3 +233,17 @@ func (c *commandRemoteMount) saveMountMapping(commandEnv *CommandEnv, writer io. return nil } + +func doSaveRemoteEntry(client filer_pb.SeaweedFilerClient, localDir string, existingEntry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + existingEntry.RemoteEntry = remoteEntry + existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize) + existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime + _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: localDir, + Entry: existingEntry, + }) + if updateErr != nil { + return updateErr + } + return nil +}
\ No newline at end of file diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go new file mode 100644 index 000000000..c07894508 --- /dev/null +++ b/weed/shell/command_remote_uncache.go @@ -0,0 +1,98 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "strings" +) + +func init() { + Commands = append(Commands, &commandRemoteUncache{}) +} + +type commandRemoteUncache struct { +} + +func (c *commandRemoteUncache) Name() string { + return "remote.uncache" +} + +func (c *commandRemoteUncache) Help() string { + return `keep the metadata but remote cache the file content for mounted directories or files + + remote.uncache -dir=xxx + remote.uncache -dir=xxx/some/sub/dir + +` +} + +func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMountCommand.String("dir", "", "a directory in filer") + + if err = remoteMountCommand.Parse(args); err != nil { + return nil + } + + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return listErr + } + if *dir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintln(writer, "need to specify '-dir' option") + return nil + } + + var localMountedDir string + for k, _ := range mappings.Mappings { + if strings.HasPrefix(*dir, k) { + localMountedDir = k + } + } + if localMountedDir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintf(writer, "%s is not mounted\n", *dir) + return nil + } + + // pull content from remote + if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir)); err != nil { + return fmt.Errorf("cache content data: %v", err) + } + + return nil +} + +func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer io.Writer, dirToCache util.FullPath) error { + + return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + if !mayHaveCachedToLocal(entry) { + return true // true means recursive traversal should continue + } + entry.RemoteEntry.LocalMtime = 0 + + println(dir, entry.Name) + + err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: string(dir), + Entry: entry, + }) + return updateErr + }) + if err != nil { + fmt.Fprintf(writer, "uncache %+v: %v\n", dir.Child(entry.Name), err) + return false + } + + return true + }) +} |
