aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-06 13:58:25 -0700
committerGitHub <noreply@github.com>2025-07-06 13:58:25 -0700
commit9b7f3b78b7b73f972665fcfa7f1bb6f12e78058d (patch)
tree04eb798b38d18c04b77638d2ec942e11c84290a7
parentaa668523047c273dc4065dc0f40852efcdf9e9f0 (diff)
downloadseaweedfs-9b7f3b78b7b73f972665fcfa7f1bb6f12e78058d.tar.xz
seaweedfs-9b7f3b78b7b73f972665fcfa7f1bb6f12e78058d.zip
enhance remote.cache to sync meta only, delete local extra (#6941)
-rw-r--r--weed/shell/command_remote_cache.go388
1 files changed, 305 insertions, 83 deletions
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go
index 987502c14..23a1eccad 100644
--- a/weed/shell/command_remote_cache.go
+++ b/weed/shell/command_remote_cache.go
@@ -4,12 +4,14 @@ import (
"context"
"flag"
"fmt"
+ "io"
+ "sync"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
+ "github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/util"
- "io"
- "sync"
)
func init() {
@@ -24,25 +26,28 @@ func (c *commandRemoteCache) Name() string {
}
func (c *commandRemoteCache) Help() string {
- return `cache the file content for mounted directories or files
+ return `comprehensive synchronization and caching between local and remote storage
# assume a remote storage is configured to name "cloud1"
remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
# mount and pull one bucket
remote.mount -dir=/xxx -remote=cloud1/bucket
- # after mount, run one of these command to cache the content of the files
- remote.cache -dir=/xxx
- remote.cache -dir=/xxx/some/sub/dir
- remote.cache -dir=/xxx/some/sub/dir -include=*.pdf
- remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt
- remote.cache -maxSize=1024000 # cache files smaller than 100K
- remote.cache -maxAge=3600 # cache files less than 1 hour old
+ # comprehensive sync and cache: update metadata, cache content, and remove deleted files
+ remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
+ remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
+ remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
+ remote.cache -dir=/xxx -concurrent=32 # with custom concurrency
+ remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
+ remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
+ remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
- This is designed to run regularly. So you can add it to some cronjob.
- If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy.
+ This command will:
+ 1. Synchronize metadata from remote storage
+ 2. Cache file content from remote by default
+ 3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable)
- The actual data copying goes through volume severs in parallel.
+ This is designed to run regularly. So you can add it to some cronjob.
`
}
@@ -53,50 +58,312 @@ func (c *commandRemoteCache) HasTag(CommandTag) bool {
func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer")
- concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading")
- fileFiler := newFileFilter(remoteMountCommand)
+ dir := remoteCacheCommand.String("dir", "", "a directory in filer")
+ cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
+ deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
+ concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
+ dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
+ fileFiler := newFileFilter(remoteCacheCommand)
- if err = remoteMountCommand.Parse(args); err != nil {
+ if err = remoteCacheCommand.Parse(args); err != nil {
return nil
}
- if *dir != "" {
- if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil {
- return err
- }
- return nil
+ if *dir == "" {
+ return fmt.Errorf("need to specify -dir option")
}
- mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
+ mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
+ if detectErr != nil {
+ jsonPrintln(writer, mappings)
+ return detectErr
+ }
+
+ // perform comprehensive sync
+ return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler)
+}
+
+func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error {
+
+ // visit remote storage
+ remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
if err != nil {
return err
}
- for key, _ := range mappings.Mappings {
- if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil {
- return err
+ remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync)
+
+ // Step 1: Collect all remote files
+ remoteFiles := make(map[string]*filer_pb.RemoteEntry)
+ err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
+ localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
+ fullPath := string(localDir.Child(name))
+ remoteFiles[fullPath] = remoteEntry
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("failed to traverse remote storage: %v", err)
+ }
+
+ fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles))
+
+ // Step 2: Collect all local files (only if we need to delete local extra files)
+ localFiles := make(map[string]*filer_pb.Entry)
+ if deleteLocalExtra {
+ err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool {
+ if entry.RemoteEntry != nil { // only consider files that are part of remote mount
+ fullPath := string(dir.Child(entry.Name))
+ localFiles[fullPath] = entry
+ }
+ return true
+ })
+ if err != nil {
+ return fmt.Errorf("failed to traverse local directory: %v", err)
}
+ fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles))
+ } else {
+ fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n")
}
- return nil
-}
+ // Step 3: Determine actions needed
+ var filesToDelete []string
+ var filesToUpdate []string
+ var filesToCache []string
-func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) error {
- mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir)
- if detectErr != nil {
- jsonPrintln(writer, mappings)
- return detectErr
+ // Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled
+ if deleteLocalExtra {
+ for localPath := range localFiles {
+ if _, exists := remoteFiles[localPath]; !exists {
+ filesToDelete = append(filesToDelete, localPath)
+ }
+ }
+ }
+
+ // Find files to update/cache (exist remotely)
+ for remotePath, remoteEntry := range remoteFiles {
+ if deleteLocalExtra {
+ // When deleteLocalExtra is enabled, we have localFiles to compare with
+ if localEntry, exists := localFiles[remotePath]; exists {
+ // File exists locally, check if it needs updating
+ if localEntry.RemoteEntry == nil ||
+ localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag ||
+ localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime {
+ filesToUpdate = append(filesToUpdate, remotePath)
+ }
+ // Check if it needs caching
+ if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
+ filesToCache = append(filesToCache, remotePath)
+ }
+ } else {
+ // File doesn't exist locally, needs to be created
+ filesToUpdate = append(filesToUpdate, remotePath)
+ }
+ } else {
+ // When deleteLocalExtra is disabled, we check each file individually
+ // All remote files are candidates for update/creation
+ filesToUpdate = append(filesToUpdate, remotePath)
+
+ // For caching, we need to check if the local file exists and needs caching
+ if shouldCache {
+ // We need to look up the local file to check if it needs caching
+ localDir, name := util.FullPath(remotePath).DirAndName()
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
+ Directory: localDir,
+ Name: name,
+ })
+ if lookupErr == nil {
+ localEntry := lookupResp.Entry
+ if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
+ filesToCache = append(filesToCache, remotePath)
+ }
+ }
+ return nil // Don't propagate lookup errors here
+ })
+ if err != nil {
+ // Log error but continue
+ fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err)
+ }
+ }
+ }
}
- // pull content from remote
- if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil {
- return fmt.Errorf("cache content data on %s: %v", localMountedDir, err)
+ fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n",
+ len(filesToDelete), len(filesToUpdate), len(filesToCache))
+
+ if dryRun {
+ fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n")
+ for _, path := range filesToDelete {
+ fmt.Fprintf(writer, "DELETE: %s\n", path)
+ }
+ for _, path := range filesToUpdate {
+ fmt.Fprintf(writer, "UPDATE: %s\n", path)
+ }
+ for _, path := range filesToCache {
+ fmt.Fprintf(writer, "CACHE: %s\n", path)
+ }
+ return nil
}
- return nil
+ // Step 4: Execute actions
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ ctx := context.Background()
+
+ // Delete files that no longer exist on remote (only if deleteLocalExtra is enabled)
+ if deleteLocalExtra {
+ for _, pathToDelete := range filesToDelete {
+ fmt.Fprintf(writer, "Deleting %s... ", pathToDelete)
+
+ dir, name := util.FullPath(pathToDelete).DirAndName()
+ _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
+ Directory: dir,
+ Name: name,
+ IgnoreRecursiveError: false,
+ IsDeleteData: true,
+ IsRecursive: false,
+ IsFromOtherCluster: false,
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "failed: %v\n", err)
+ return err
+ }
+ fmt.Fprintf(writer, "done\n")
+ }
+ }
+
+ // Update metadata for files that exist on remote
+ for _, pathToUpdate := range filesToUpdate {
+ remoteEntry := remoteFiles[pathToUpdate]
+ localDir, name := util.FullPath(pathToUpdate).DirAndName()
+
+ fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate)
+
+ // Check if file exists locally
+ lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: string(localDir),
+ Name: name,
+ })
+
+ if lookupErr != nil && lookupErr != filer_pb.ErrNotFound {
+ fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr)
+ continue
+ }
+
+ isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0
+ if lookupErr == filer_pb.ErrNotFound {
+ // Create new entry
+ _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
+ Directory: string(localDir),
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: isDirectory,
+ Attributes: &filer_pb.FuseAttributes{
+ FileSize: uint64(remoteEntry.RemoteSize),
+ Mtime: remoteEntry.RemoteMtime,
+ FileMode: uint32(0644),
+ },
+ RemoteEntry: remoteEntry,
+ },
+ })
+ if createErr != nil {
+ fmt.Fprintf(writer, "failed to create: %v\n", createErr)
+ continue
+ }
+ } else {
+ // Update existing entry
+ existingEntry := lookupResp.Entry
+ if existingEntry.RemoteEntry == nil {
+ // This is a local file, skip to avoid overwriting
+ fmt.Fprintf(writer, "skipped (local file)\n")
+ continue
+ }
+
+ existingEntry.RemoteEntry = remoteEntry
+ existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)
+ existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime
+ existingEntry.Attributes.Md5 = nil
+ existingEntry.Chunks = nil
+ existingEntry.Content = nil
+
+ _, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
+ Directory: string(localDir),
+ Entry: existingEntry,
+ })
+ if updateErr != nil {
+ fmt.Fprintf(writer, "failed to update: %v\n", updateErr)
+ continue
+ }
+ }
+ fmt.Fprintf(writer, "done\n")
+ }
+
+ // Cache file content if requested
+ if shouldCache && len(filesToCache) > 0 {
+ fmt.Fprintf(writer, "Caching file content...\n")
+
+ var wg sync.WaitGroup
+ limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
+ var executionErr error
+
+ for _, pathToCache := range filesToCache {
+ wg.Add(1)
+ pathToCacheCopy := pathToCache // Capture for closure
+ limitedConcurrentExecutor.Execute(func() {
+ defer wg.Done()
+
+ // Get local entry (either from localFiles map or by lookup)
+ var localEntry *filer_pb.Entry
+ if deleteLocalExtra {
+ localEntry = localFiles[pathToCacheCopy]
+ if localEntry == nil {
+ fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy)
+ return
+ }
+ } else {
+ // Look up the local entry since we don't have it in localFiles
+ localDir, name := util.FullPath(pathToCacheCopy).DirAndName()
+ lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
+ Directory: localDir,
+ Name: name,
+ })
+ if err == nil {
+ localEntry = lookupResp.Entry
+ }
+ return err
+ })
+ if lookupErr != nil {
+ fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr)
+ return
+ }
+ }
+
+ dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
+ remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy))
+
+ fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
+
+ if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil {
+ fmt.Fprintf(writer, "failed: %v\n", err)
+ if executionErr == nil {
+ executionErr = err
+ }
+ return
+ }
+ fmt.Fprintf(writer, "done\n")
+ })
+ }
+
+ wg.Wait()
+ if executionErr != nil {
+ return executionErr
+ }
+ }
+
+ return nil
+ })
}
func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
@@ -145,48 +412,3 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
}
return false
}
-
-func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error {
-
- var wg sync.WaitGroup
- limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
- var executionErr error
-
- traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
- if !shouldCacheToLocal(entry) {
- return true // true means recursive traversal should continue
- }
-
- if !fileFilter.matches(entry) {
- return true
- }
-
- wg.Add(1)
- limitedConcurrentExecutor.Execute(func() {
- defer wg.Done()
- fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name))
-
- remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
-
- if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
- fmt.Fprintf(writer, "CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err)
- if executionErr == nil {
- executionErr = fmt.Errorf("CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err)
- }
- return
- }
- fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name))
- })
-
- return true
- })
- wg.Wait()
-
- if traverseErr != nil {
- return traverseErr
- }
- if executionErr != nil {
- return executionErr
- }
- return nil
-}