From a539d6489639217c1b2fded6ff7238b2d5e7b5da Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Aug 2021 12:09:54 -0700 Subject: refactor --- weed/shell/command_remote_cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 21c479258..f032239f3 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -52,8 +52,9 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io return nil } - localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) + mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) if detectErr != nil{ + jsonPrintln(writer, mappings) return detectErr } -- cgit v1.2.3 From 2158d4fe4d613fcb17ca4f59ea2ebd4e5cd97bbb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 21 Aug 2021 02:17:10 -0700 Subject: adjust help message --- weed/shell/command_remote_cache.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index f032239f3..abd53461b 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -32,11 +32,14 @@ func (c *commandRemoteCache) Help() string { remote.cache -dir=/xxx remote.cache -dir=/xxx/some/sub/dir remote.cache -dir=/xxx/some/sub/dir -include=*.pdf + remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt + remote.cache -maxSize=1024000 # cache files smaller than 100K + remote.cache -maxAge=3600 # cache files less than 1 hour old This is designed to run regularly. So you can add it to some cronjob. If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy. - The actual data copying goes through volume severs. + The actual data copying goes through volume severs in parallel. ` } -- cgit v1.2.3 From 05a648bb96df1be5a9261737d8f6fd01600c6a63 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 26 Aug 2021 15:18:34 -0700 Subject: refactor: separating out remote.proto --- weed/shell/command_remote_cache.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index abd53461b..9570291df 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" ) @@ -110,13 +111,13 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { if entry.RemoteEntry == nil { return false // should not uncache an entry that is not in remote } - if entry.RemoteEntry.LastLocalSyncTsNs > 0 && len(entry.Chunks) > 0 { + if entry.RemoteEntry.LastLocalSyncTsNs > 0 { return true } return false } -func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *filer_pb.RemoteConf) error { +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf) error { return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { if !shouldCacheToLocal(entry) { @@ -127,7 +128,7 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io. return true } - println(dir, entry.Name) + fmt.Fprintf(writer, "Cache %+v ... ", dir.Child(entry.Name)) remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) @@ -135,6 +136,7 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io. fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) return false } + fmt.Fprintf(writer, "Done\n") return true }) -- cgit v1.2.3 From 6a0bb7106bc7c3524991f8a2a5b78d636613095d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 26 Aug 2021 16:16:26 -0700 Subject: cloud drive: parallelize remote storage downloading --- weed/shell/command_remote_cache.go | 42 +++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 10 deletions(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 9570291df..647de0d25 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "sync" ) func init() { @@ -50,6 +51,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) dir := remoteMountCommand.String("dir", "", "a directory in filer") + concurrency := remoteMountCommand.Int("concurrent", 8, "concurrent file downloading") fileFiler := newFileFilter(remoteMountCommand) if err = remoteMountCommand.Parse(args); err != nil { @@ -63,7 +65,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io } // pull content from remote - if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil { + if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf, *concurrency); err != nil { return fmt.Errorf("cache content data: %v", err) } @@ -117,9 +119,13 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { return false } -func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf) error { +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error { - return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + var wg sync.WaitGroup + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) + var executionErr error + + traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { if !shouldCacheToLocal(entry) { return true // true means recursive traversal should continue } @@ -128,16 +134,32 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io. return true } - fmt.Fprintf(writer, "Cache %+v ... ", dir.Child(entry.Name)) + wg.Add(1) + limitedConcurrentExecutor.Execute(func() { + defer wg.Done() + fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name)) - remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) - if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { - fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) - return false - } - fmt.Fprintf(writer, "Done\n") + if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { + fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) + if executionErr == nil { + executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err) + } + return + } + fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name)) + }) return true }) + wg.Wait() + + if traverseErr != nil { + return traverseErr + } + if executionErr != nil { + return executionErr + } + return nil } -- cgit v1.2.3 From 49a8dfb9765bfa5cb9ffd5246e5af582edaaf65d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 26 Aug 2021 17:05:56 -0700 Subject: adjust default concurrent level --- weed/shell/command_remote_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 647de0d25..2888ec979 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -51,7 +51,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) dir := remoteMountCommand.String("dir", "", "a directory in filer") - concurrency := remoteMountCommand.Int("concurrent", 8, "concurrent file downloading") + concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading") fileFiler := newFileFilter(remoteMountCommand) if err = remoteMountCommand.Parse(args); err != nil { -- cgit v1.2.3 From 7ce97b59d828b3f6d7bcdab323209740280da3f3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 1 Sep 2021 02:45:42 -0700 Subject: go fmt --- weed/shell/command_remote_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 2888ec979..488fea7ea 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -59,7 +59,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io } mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) - if detectErr != nil{ + if detectErr != nil { jsonPrintln(writer, mappings) return detectErr } -- cgit v1.2.3 From d983aa4c7dc4a8f6dc7d783d69669b74df41a749 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 4 Sep 2021 13:58:14 -0700 Subject: correct filtering --- weed/shell/command_remote_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 488fea7ea..51f170666 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -130,7 +130,7 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io. return true // true means recursive traversal should continue } - if fileFilter.matches(entry) { + if !fileFilter.matches(entry) { return true } -- cgit v1.2.3 From 57a95887d23777813ffd580d809bfea9ac84036f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Sep 2021 14:47:06 -0700 Subject: remote.cache remote.uncache supports all mounted directories --- weed/shell/command_remote_cache.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) (limited to 'weed/shell/command_remote_cache.go') diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 51f170666..58b6d7b5e 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -50,7 +50,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - dir := remoteMountCommand.String("dir", "", "a directory in filer") + dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer") concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading") fileFiler := newFileFilter(remoteMountCommand) @@ -58,15 +58,37 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io return nil } - mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) + if *dir != "" { + if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil { + return err + } + return nil + } + + mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if err != nil { + return err + } + + for key, _ := range mappings.Mappings { + if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil { + return err + } + } + + return nil +} + +func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) (error) { + mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir) if detectErr != nil { jsonPrintln(writer, mappings) return detectErr } // pull content from remote - if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf, *concurrency); err != nil { - return fmt.Errorf("cache content data: %v", err) + if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil { + return fmt.Errorf("cache content data on %s: %v", localMountedDir, err) } return nil -- cgit v1.2.3