aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_remote_cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_remote_cache.go')
-rw-r--r--weed/shell/command_remote_cache.go82
1 files changed, 66 insertions, 16 deletions
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go
index 21c479258..58b6d7b5e 100644
--- a/weed/shell/command_remote_cache.go
+++ b/weed/shell/command_remote_cache.go
@@ -5,8 +5,10 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
+ "sync"
)
func init() {
@@ -32,11 +34,14 @@ func (c *commandRemoteCache) Help() string {
remote.cache -dir=/xxx
remote.cache -dir=/xxx/some/sub/dir
remote.cache -dir=/xxx/some/sub/dir -include=*.pdf
+ remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt
+ remote.cache -maxSize=1024000 # cache files smaller than 100K
+ remote.cache -maxAge=3600 # cache files less than 1 hour old
This is designed to run regularly. So you can add it to some cronjob.
If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy.
- The actual data copying goes through volume severs.
+ The actual data copying goes through volume severs in parallel.
`
}
@@ -45,21 +50,45 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- dir := remoteMountCommand.String("dir", "", "a directory in filer")
+ dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer")
+ concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading")
fileFiler := newFileFilter(remoteMountCommand)
if err = remoteMountCommand.Parse(args); err != nil {
return nil
}
- localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
- if detectErr != nil{
+ if *dir != "" {
+ if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
+ if err != nil {
+ return err
+ }
+
+ for key, _ := range mappings.Mappings {
+ if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) (error) {
+ mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir)
+ if detectErr != nil {
+ jsonPrintln(writer, mappings)
return detectErr
}
// pull content from remote
- if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil {
- return fmt.Errorf("cache content data: %v", err)
+ if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil {
+ return fmt.Errorf("cache content data on %s: %v", localMountedDir, err)
}
return nil
@@ -106,32 +135,53 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
if entry.RemoteEntry == nil {
return false // should not uncache an entry that is not in remote
}
- if entry.RemoteEntry.LastLocalSyncTsNs > 0 && len(entry.Chunks) > 0 {
+ if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
return true
}
return false
}
-func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *filer_pb.RemoteConf) error {
+func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error {
+
+ var wg sync.WaitGroup
+ limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
+ var executionErr error
- return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
+ traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
if !shouldCacheToLocal(entry) {
return true // true means recursive traversal should continue
}
- if fileFilter.matches(entry) {
+ if !fileFilter.matches(entry) {
return true
}
- println(dir, entry.Name)
+ wg.Add(1)
+ limitedConcurrentExecutor.Execute(func() {
+ defer wg.Done()
+ fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name))
- remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
+ remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
- if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
- fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err)
- return false
- }
+ if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
+ fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err)
+ if executionErr == nil {
+ executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err)
+ }
+ return
+ }
+ fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name))
+ })
return true
})
+ wg.Wait()
+
+ if traverseErr != nil {
+ return traverseErr
+ }
+ if executionErr != nil {
+ return executionErr
+ }
+ return nil
}