aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_remote_cache.go42
1 files changed, 32 insertions, 10 deletions
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go
index 9570291df..647de0d25 100644
--- a/weed/shell/command_remote_cache.go
+++ b/weed/shell/command_remote_cache.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
+ "sync"
)
func init() {
@@ -50,6 +51,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
dir := remoteMountCommand.String("dir", "", "a directory in filer")
+ concurrency := remoteMountCommand.Int("concurrent", 8, "concurrent file downloading")
fileFiler := newFileFilter(remoteMountCommand)
if err = remoteMountCommand.Parse(args); err != nil {
@@ -63,7 +65,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
}
// pull content from remote
- if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil {
+ if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf, *concurrency); err != nil {
return fmt.Errorf("cache content data: %v", err)
}
@@ -117,9 +119,13 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
return false
}
-func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf) error {
+func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error {
- return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
+ var wg sync.WaitGroup
+ limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
+ var executionErr error
+
+ traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
if !shouldCacheToLocal(entry) {
return true // true means recursive traversal should continue
}
@@ -128,16 +134,32 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.
return true
}
- fmt.Fprintf(writer, "Cache %+v ... ", dir.Child(entry.Name))
+ wg.Add(1)
+ limitedConcurrentExecutor.Execute(func() {
+ defer wg.Done()
+ fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name))
- remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
+ remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
- if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
- fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err)
- return false
- }
- fmt.Fprintf(writer, "Done\n")
+ if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
+ fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err)
+ if executionErr == nil {
+ executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err)
+ }
+ return
+ }
+ fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name))
+ })
return true
})
+ wg.Wait()
+
+ if traverseErr != nil {
+ return traverseErr
+ }
+ if executionErr != nil {
+ return executionErr
+ }
+ return nil
}