diff options
Diffstat (limited to 'weed/shell/command_remote_cache.go')
| -rw-r--r-- | weed/shell/command_remote_cache.go | 82 |
1 files changed, 66 insertions, 16 deletions
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 21c479258..58b6d7b5e 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -5,8 +5,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "sync" ) func init() { @@ -32,11 +34,14 @@ func (c *commandRemoteCache) Help() string { remote.cache -dir=/xxx remote.cache -dir=/xxx/some/sub/dir remote.cache -dir=/xxx/some/sub/dir -include=*.pdf + remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt + remote.cache -maxSize=1024000 # cache files smaller than 100K + remote.cache -maxAge=3600 # cache files less than 1 hour old This is designed to run regularly. So you can add it to some cronjob. If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy. - The actual data copying goes through volume severs. + The actual data copying goes through volume severs in parallel. ` } @@ -45,21 +50,45 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - dir := remoteMountCommand.String("dir", "", "a directory in filer") + dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer") + concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading") fileFiler := newFileFilter(remoteMountCommand) if err = remoteMountCommand.Parse(args); err != nil { return nil } - localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) - if detectErr != nil{ + if *dir != "" { + if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil { + return err + } + return nil + } + + mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if err != nil { + return err + } + + for key, _ := range mappings.Mappings { + if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil { + return err + } + } + + return nil +} + +func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) (error) { + mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir) + if detectErr != nil { + jsonPrintln(writer, mappings) return detectErr } // pull content from remote - if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil { - return fmt.Errorf("cache content data: %v", err) + if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil { + return fmt.Errorf("cache content data on %s: %v", localMountedDir, err) } return nil @@ -106,32 +135,53 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { if entry.RemoteEntry == nil { return false // should not uncache an entry that is not in remote } - if entry.RemoteEntry.LastLocalSyncTsNs > 0 && len(entry.Chunks) > 0 { + if entry.RemoteEntry.LastLocalSyncTsNs > 0 { return true } return false } -func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *filer_pb.RemoteConf) error { +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error { + + var wg sync.WaitGroup + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) + var executionErr error - return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { if !shouldCacheToLocal(entry) { return true // true means recursive traversal should continue } - if fileFilter.matches(entry) { + if !fileFilter.matches(entry) { return true } - println(dir, entry.Name) + wg.Add(1) + limitedConcurrentExecutor.Execute(func() { + defer wg.Done() + fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name)) - remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) - if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { - fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) - return false - } + if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { + fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) + if executionErr == nil { + executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err) + } + return + } + fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name)) + }) return true }) + wg.Wait() + + if traverseErr != nil { + return traverseErr + } + if executionErr != nil { + return executionErr + } + return nil } |
