diff options
Diffstat (limited to 'weed/shell/command_remote_cache.go')
| -rw-r--r-- | weed/shell/command_remote_cache.go | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index abd53461b..2888ec979 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -5,8 +5,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "sync" ) func init() { @@ -49,6 +51,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) dir := remoteMountCommand.String("dir", "", "a directory in filer") + concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading") fileFiler := newFileFilter(remoteMountCommand) if err = remoteMountCommand.Parse(args); err != nil { @@ -62,7 +65,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io } // pull content from remote - if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil { + if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf, *concurrency); err != nil { return fmt.Errorf("cache content data: %v", err) } @@ -110,15 +113,19 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { if entry.RemoteEntry == nil { return false // should not uncache an entry that is not in remote } - if entry.RemoteEntry.LastLocalSyncTsNs > 0 && len(entry.Chunks) > 0 { + if entry.RemoteEntry.LastLocalSyncTsNs > 0 { return true } return false } -func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *filer_pb.RemoteConf) error { +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error { - return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + var wg sync.WaitGroup + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) + var executionErr error + + traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { if !shouldCacheToLocal(entry) { return true // true means recursive traversal should continue } @@ -127,15 +134,32 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io. return true } - println(dir, entry.Name) + wg.Add(1) + limitedConcurrentExecutor.Execute(func() { + defer wg.Done() + fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name)) - remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) - if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { - fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) - return false - } + if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { + fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) + if executionErr == nil { + executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err) + } + return + } + fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name)) + }) return true }) + wg.Wait() + + if traverseErr != nil { + return traverseErr + } + if executionErr != nil { + return executionErr + } + return nil } |
