diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_fs_cat.go | 5 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_save.go | 5 | ||||
| -rw-r--r-- | weed/shell/command_remote_cache.go | 82 | ||||
| -rw-r--r-- | weed/shell/command_remote_configure.go | 95 | ||||
| -rw-r--r-- | weed/shell/command_remote_meta_sync.go | 48 | ||||
| -rw-r--r-- | weed/shell/command_remote_mount.go | 83 | ||||
| -rw-r--r-- | weed/shell/command_remote_mount_buckets.go | 121 | ||||
| -rw-r--r-- | weed/shell/command_remote_uncache.go | 49 | ||||
| -rw-r--r-- | weed/shell/command_remote_unmount.go | 53 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 2 | ||||
| -rw-r--r-- | weed/shell/commands.go | 2 | ||||
| -rw-r--r-- | weed/shell/shell_liner.go | 24 |
12 files changed, 400 insertions, 169 deletions
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 3c5e13663..a5731240d 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -52,6 +52,11 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write return err } + if len(respLookupEntry.Entry.Content) > 0 { + _, err = writer.Write(respLookupEntry.Entry.Content) + return err + } + return filer.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) }) diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index b6f2a9172..d7cc2efef 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "io" "os" "path/filepath" @@ -125,6 +126,10 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + if strings.HasPrefix(string(parentPath), filer.SystemLogDir) { + return + } + protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), Entry: entry, diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 21c479258..58b6d7b5e 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -5,8 +5,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "sync" ) func init() { @@ -32,11 +34,14 @@ func (c *commandRemoteCache) Help() string { remote.cache -dir=/xxx remote.cache -dir=/xxx/some/sub/dir remote.cache -dir=/xxx/some/sub/dir -include=*.pdf + remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt + remote.cache -maxSize=1024000 # cache files smaller than 100K + remote.cache -maxAge=3600 # cache files less than 1 hour old This is designed to run regularly. So you can add it to some cronjob. If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy. - The actual data copying goes through volume severs. + The actual data copying goes through volume severs in parallel. ` } @@ -45,21 +50,45 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - dir := remoteMountCommand.String("dir", "", "a directory in filer") + dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer") + concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading") fileFiler := newFileFilter(remoteMountCommand) if err = remoteMountCommand.Parse(args); err != nil { return nil } - localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) - if detectErr != nil{ + if *dir != "" { + if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil { + return err + } + return nil + } + + mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if err != nil { + return err + } + + for key, _ := range mappings.Mappings { + if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil { + return err + } + } + + return nil +} + +func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) (error) { + mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir) + if detectErr != nil { + jsonPrintln(writer, mappings) return detectErr } // pull content from remote - if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil { - return fmt.Errorf("cache content data: %v", err) + if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil { + return fmt.Errorf("cache content data on %s: %v", localMountedDir, err) } return nil @@ -106,32 +135,53 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { if entry.RemoteEntry == nil { return false // should not uncache an entry that is not in remote } - if entry.RemoteEntry.LastLocalSyncTsNs > 0 && len(entry.Chunks) > 0 { + if entry.RemoteEntry.LastLocalSyncTsNs > 0 { return true } return false } -func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *filer_pb.RemoteConf) error { +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error { + + var wg sync.WaitGroup + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) + var executionErr error - return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { if !shouldCacheToLocal(entry) { return true // true means recursive traversal should continue } - if fileFilter.matches(entry) { + if !fileFilter.matches(entry) { return true } - println(dir, entry.Name) + wg.Add(1) + limitedConcurrentExecutor.Execute(func() { + defer wg.Done() + fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name)) - remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) - if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { - fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) - return false - } + if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { + fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) + if executionErr == nil { + executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err) + } + return + } + fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name)) + }) return true }) + wg.Wait() + + if traverseErr != nil { + return traverseErr + } + if executionErr != nil { + return executionErr + } + return nil } diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go index 7a9ad1f65..3fa905237 100644 --- a/weed/shell/command_remote_configure.go +++ b/weed/shell/command_remote_configure.go @@ -6,6 +6,8 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" @@ -32,7 +34,14 @@ func (c *commandRemoteConfigure) Help() string { remote.configure # set or update a configuration - remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy -s3.region=us-east-2 + remote.configure -name=cloud2 -type=gcs -gcs.appCredentialsFile=~/service-account-file.json -gcs.projectId=yyy + remote.configure -name=cloud3 -type=azure -azure.account_name=xxx -azure.account_key=yyy + remote.configure -name=cloud4 -type=aliyun -aliyun.access_key=xxx -aliyun.secret_key=yyy -aliyun.endpoint=oss-cn-shenzhen.aliyuncs.com -aliyun.region=cn-sehnzhen + remote.configure -name=cloud5 -type=tencent -tencent.secret_id=xxx -tencent.secret_key=yyy -tencent.endpoint=cos.ap-guangzhou.myqcloud.com + remote.configure -name=cloud6 -type=wasabi -wasabi.access_key=xxx -wasabi.secret_key=yyy -wasabi.endpoint=s3.us-west-1.wasabisys.com -wasabi.region=us-west-1 + remote.configure -name=cloud7 -type=storj -storj.access_key=xxx -storj.secret_key=yyy -storj.endpoint=https://gateway.us1.storjshare.io + remote.configure -name=cloud8 -type=filebase -filebase.access_key=xxx -filebase.secret_key=yyy -filebase.endpoint=https://s3.filebase.com # delete one configuration remote.configure -delete -name=cloud1 @@ -46,23 +55,79 @@ var ( func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - conf := &filer_pb.RemoteConf{} + conf := &remote_pb.RemoteConf{} remoteConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) isDelete := remoteConfigureCommand.Bool("delete", false, "delete one remote storage by its name") remoteConfigureCommand.StringVar(&conf.Name, "name", "", "a short name to identify the remote storage") - remoteConfigureCommand.StringVar(&conf.Type, "type", "s3", "storage type, currently only support s3") + remoteConfigureCommand.StringVar(&conf.Type, "type", "s3", fmt.Sprintf("[%s] storage type", remote_storage.GetAllRemoteStorageNames())) remoteConfigureCommand.StringVar(&conf.S3AccessKey, "s3.access_key", "", "s3 access key") remoteConfigureCommand.StringVar(&conf.S3SecretKey, "s3.secret_key", "", "s3 secret key") remoteConfigureCommand.StringVar(&conf.S3Region, "s3.region", "us-east-2", "s3 region") remoteConfigureCommand.StringVar(&conf.S3Endpoint, "s3.endpoint", "", "endpoint for s3-compatible local object store") + remoteConfigureCommand.StringVar(&conf.S3StorageClass, "s3.storage_class", "", "s3 storage class") + remoteConfigureCommand.BoolVar(&conf.S3ForcePathStyle, "s3.force_path_style", true, "s3 force path style") + remoteConfigureCommand.BoolVar(&conf.S3V4Signature, "s3.v4_signature", false, "s3 V4 signature") + + remoteConfigureCommand.StringVar(&conf.GcsGoogleApplicationCredentials, "gcs.appCredentialsFile", "", "google cloud storage credentials file, default to use env GOOGLE_APPLICATION_CREDENTIALS") + remoteConfigureCommand.StringVar(&conf.GcsProjectId, "gcs.projectId", "", "google cloud storage project id, default to use env GOOGLE_CLOUD_PROJECT") + + remoteConfigureCommand.StringVar(&conf.AzureAccountName, "azure.account_name", "", "azure account name, default to use env AZURE_STORAGE_ACCOUNT") + remoteConfigureCommand.StringVar(&conf.AzureAccountKey, "azure.account_key", "", "azure account name, default to use env AZURE_STORAGE_ACCESS_KEY") + + remoteConfigureCommand.StringVar(&conf.BackblazeKeyId, "b2.key_id", "", "backblaze keyID") + remoteConfigureCommand.StringVar(&conf.BackblazeApplicationKey, "b2.application_key", "", "backblaze applicationKey. Note that your Master Application Key will not work with the S3 Compatible API. You must create a new key that is eligible for use. For more information: https://help.backblaze.com/hc/en-us/articles/360047425453") + remoteConfigureCommand.StringVar(&conf.BackblazeEndpoint, "b2.endpoint", "", "backblaze endpoint") + + remoteConfigureCommand.StringVar(&conf.AliyunAccessKey, "aliyun.access_key", "", "Aliyun access key, default to use env ALICLOUD_ACCESS_KEY_ID") + remoteConfigureCommand.StringVar(&conf.AliyunSecretKey, "aliyun.secret_key", "", "Aliyun secret key, default to use env ALICLOUD_ACCESS_KEY_SECRET") + remoteConfigureCommand.StringVar(&conf.AliyunEndpoint, "aliyun.endpoint", "", "Aliyun endpoint") + remoteConfigureCommand.StringVar(&conf.AliyunRegion, "aliyun.region", "", "Aliyun region") + + remoteConfigureCommand.StringVar(&conf.TencentSecretId, "tencent.secret_id", "", "Tencent Secret Id, default to use env COS_SECRETID") + remoteConfigureCommand.StringVar(&conf.TencentSecretKey, "tencent.secret_key", "", "Tencent secret key, default to use env COS_SECRETKEY") + remoteConfigureCommand.StringVar(&conf.TencentEndpoint, "tencent.endpoint", "", "Tencent endpoint") + + remoteConfigureCommand.StringVar(&conf.BaiduAccessKey, "baidu.access_key", "", "Baidu access key, default to use env BDCLOUD_ACCESS_KEY") + remoteConfigureCommand.StringVar(&conf.BaiduSecretKey, "baidu.secret_key", "", "Baidu secret key, default to use env BDCLOUD_SECRET_KEY") + remoteConfigureCommand.StringVar(&conf.BaiduEndpoint, "baidu.endpoint", "", "Baidu endpoint") + remoteConfigureCommand.StringVar(&conf.BaiduRegion, "baidu.region", "", "Baidu region") + + remoteConfigureCommand.StringVar(&conf.WasabiAccessKey, "wasabi.access_key", "", "Wasabi access key") + remoteConfigureCommand.StringVar(&conf.WasabiSecretKey, "wasabi.secret_key", "", "Wasabi secret key") + remoteConfigureCommand.StringVar(&conf.WasabiEndpoint, "wasabi.endpoint", "", "Wasabi endpoint, see https://wasabi.com/wp-content/themes/wasabi/docs/API_Guide/index.html#t=topics%2Fapidiff-intro.htm") + remoteConfigureCommand.StringVar(&conf.WasabiRegion, "wasabi.region", "", "Wasabi region") + + remoteConfigureCommand.StringVar(&conf.FilebaseAccessKey, "filebase.access_key", "", "Filebase access key") + remoteConfigureCommand.StringVar(&conf.FilebaseSecretKey, "filebase.secret_key", "", "Filebase secret key") + remoteConfigureCommand.StringVar(&conf.FilebaseEndpoint, "filebase.endpoint", "", "Filebase endpoint, https://s3.filebase.com") + + remoteConfigureCommand.StringVar(&conf.StorjAccessKey, "storj.access_key", "", "Storj access key") + remoteConfigureCommand.StringVar(&conf.StorjSecretKey, "storj.secret_key", "", "Storj secret key") + remoteConfigureCommand.StringVar(&conf.StorjEndpoint, "storj.endpoint", "", "Storj endpoint") + + var namenodes arrayFlags + remoteConfigureCommand.Var(&namenodes, "hdfs.namenodes", "hdfs name node and port, example: namenode1:8020,namenode2:8020") + remoteConfigureCommand.StringVar(&conf.HdfsUsername, "hdfs.username", "", "hdfs user name") + remoteConfigureCommand.StringVar(&conf.HdfsServicePrincipalName, "hdfs.servicePrincipalName", "", `Kerberos service principal name for the namenode + +Example: hdfs/namenode.hadoop.docker +Namenode running as service 'hdfs' with FQDN 'namenode.hadoop.docker'. +`) + remoteConfigureCommand.StringVar(&conf.HdfsDataTransferProtection, "hdfs.dataTransferProtection", "", "[authentication|integrity|privacy] Kerberos data transfer protection") if err = remoteConfigureCommand.Parse(args); err != nil { return nil } + if conf.Type != "s3" { + // clear out the default values + conf.S3Region = "" + conf.S3ForcePathStyle = false + } + if conf.Name == "" { return c.listExistingRemoteStorages(commandEnv, writer) } @@ -89,13 +154,20 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { return nil } - conf := &filer_pb.RemoteConf{} + conf := &remote_pb.RemoteConf{} if err := proto.Unmarshal(entry.Content, conf); err != nil { return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) } - conf.S3SecretKey = "" + // change secret key to stars + conf.S3SecretKey = strings.Repeat("*", len(conf.S3SecretKey)) + conf.AliyunSecretKey = strings.Repeat("*", len(conf.AliyunSecretKey)) + conf.BaiduAccessKey = strings.Repeat("*", len(conf.BaiduAccessKey)) + conf.FilebaseSecretKey = strings.Repeat("*", len(conf.FilebaseSecretKey)) + conf.StorjSecretKey = strings.Repeat("*", len(conf.StorjSecretKey)) + conf.TencentSecretKey = strings.Repeat("*", len(conf.TencentSecretKey)) + conf.WasabiSecretKey = strings.Repeat("*", len(conf.WasabiSecretKey)) m := jsonpb.Marshaler{ EmitDefaults: false, @@ -135,7 +207,7 @@ func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, wri } -func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, writer io.Writer, conf *filer_pb.RemoteConf) error { +func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, writer io.Writer, conf *remote_pb.RemoteConf) error { data, err := proto.Marshal(conf) if err != nil { @@ -151,3 +223,14 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write return nil } + +type arrayFlags []string + +func (i *arrayFlags) String() string { + return "my string representation" +} + +func (i *arrayFlags) Set(value string) error { + *i = append(*i, value) + return nil +} diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go index b6fb15a62..5dbf55987 100644 --- a/weed/shell/command_remote_meta_sync.go +++ b/weed/shell/command_remote_meta_sync.go @@ -6,10 +6,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "io" - "strings" ) func init() { @@ -54,55 +54,29 @@ func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer return nil } - localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) - if detectErr != nil{ + mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) + if detectErr != nil { + jsonPrintln(writer, mappings) return detectErr } // pull metadata from remote if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil { - return fmt.Errorf("cache content data: %v", err) + return fmt.Errorf("cache meta data: %v", err) } return nil } -func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) { - mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) - if listErr != nil { - return "", nil, nil, listErr - } - if dir == "" { - jsonPrintln(writer, mappings) - return "", nil, nil, fmt.Errorf("need to specify '-dir' option") - } - - var localMountedDir string - var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation - for k, loc := range mappings.Mappings { - if strings.HasPrefix(dir, k) { - localMountedDir, remoteStorageMountedLocation = k, loc - } - } - if localMountedDir == "" { - jsonPrintln(writer, mappings) - return "", nil, nil, fmt.Errorf("%s is not mounted", dir) - } - - // find remote storage configuration - remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name) - if err != nil { - return "", nil, nil, err - } - - return localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil +func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) { + return filer.DetectMountInfo(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, dir) } /* This function update entry.RemoteEntry if the remote has any changes. To pull remote updates, or created for the first time, the criteria is: - entry == nil or (entry.RemoteEntry != nil and entry.RemoteEntry.RemoteTag != remote.RemoteTag) + entry == nil or (entry.RemoteEntry != nil and (entry.RemoteEntry.RemoteTag != remote.RemoteTag or entry.RemoteEntry.RemoteMTime < remote.RemoteMTime )) After the meta pull, the entry.RemoteEntry will have: remoteEntry.LastLocalSyncTsNs == 0 Attributes.FileSize = uint64(remoteEntry.RemoteSize) @@ -132,8 +106,8 @@ func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (stri If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag { the remote version is updated, need to pull meta } - */ -func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error { +*/ +func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *remote_pb.RemoteConf) error { // visit remote storage remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) @@ -184,7 +158,7 @@ func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util fmt.Fprintln(writer, " (skip)") return nil } - if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag { + if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag || existingEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime { // the remote version is updated, need to pull meta fmt.Fprintln(writer, " (update)") return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry) diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 077c64e94..c2d9ec6ba 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -6,11 +6,15 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "io" + "os" + "strings" + "time" ) func init() { @@ -58,29 +62,31 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io return err } - remoteStorageLocation := remote_storage.ParseLocation(*remote) - // find configuration for remote storage - // remotePath is /<bucket>/path/to/dir - remoteConf, err := c.findRemoteStorageConfiguration(commandEnv, writer, remoteStorageLocation) + remoteConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote_storage.ParseLocationName(*remote)) if err != nil { return fmt.Errorf("find configuration for %s: %v", *remote, err) } + remoteStorageLocation, err := remote_storage.ParseRemoteLocation(remoteConf.Type, *remote) + if err != nil { + return err + } + // sync metadata from remote - if err = c.syncMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remoteStorageLocation); err != nil { + if err = syncMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remoteStorageLocation); err != nil { return fmt.Errorf("pull metadata: %v", err) } // store a mount configuration in filer - if err = c.saveMountMapping(commandEnv, writer, *dir, remoteStorageLocation); err != nil { + if err = filer.InsertMountMapping(commandEnv, *dir, remoteStorageLocation); err != nil { return fmt.Errorf("save mount mapping: %v", err) } return nil } -func listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (mappings *filer_pb.RemoteStorageMapping, err error) { +func listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (mappings *remote_pb.RemoteStorageMapping, err error) { // read current mapping mappings, err = filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) @@ -95,6 +101,9 @@ func listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) ( } func jsonPrintln(writer io.Writer, message proto.Message) error { + if message == nil { + return nil + } m := jsonpb.Marshaler{ EmitDefaults: false, Indent: " ", @@ -105,13 +114,7 @@ func jsonPrintln(writer io.Writer, message proto.Message) error { return err } -func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { - - return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) - -} - -func (c *commandRemoteMount) syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *filer_pb.RemoteConf, remote *filer_pb.RemoteStorageLocation) error { +func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error { // find existing directory, and ensure the directory is empty err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { @@ -121,7 +124,24 @@ func (c *commandRemoteMount) syncMetadata(commandEnv *CommandEnv, writer io.Writ Name: name, }) if lookupErr != nil { - return fmt.Errorf("lookup %s: %v", dir, lookupErr) + if strings.Contains(lookupErr.Error(), filer_pb.ErrNotFound.Error()) { + _, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: parent, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0644 | os.ModeDir), + }, + RemoteEntry: &filer_pb.RemoteEntry{ + StorageName: remoteConf.Name, + }, + }, + }) + return createErr + } } mountToDirIsEmpty := true @@ -148,38 +168,7 @@ func (c *commandRemoteMount) syncMetadata(commandEnv *CommandEnv, writer io.Writ // pull metadata from remote if err = pullMetadata(commandEnv, writer, util.FullPath(dir), remote, util.FullPath(dir), remoteConf); err != nil { - return fmt.Errorf("cache content data: %v", err) - } - - return nil -} - -func (c *commandRemoteMount) saveMountMapping(commandEnv *CommandEnv, writer io.Writer, dir string, remoteStorageLocation *filer_pb.RemoteStorageLocation) (err error) { - - // read current mapping - var oldContent, newContent []byte - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE) - return err - }) - if err != nil { - if err != filer_pb.ErrNotFound { - return fmt.Errorf("read existing mapping: %v", err) - } - } - - // add new mapping - newContent, err = filer.AddRemoteStorageMapping(oldContent, dir, remoteStorageLocation) - if err != nil { - return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err) - } - - // save back - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent) - }) - if err != nil { - return fmt.Errorf("save mapping: %v", err) + return fmt.Errorf("cache metadata: %v", err) } return nil diff --git a/weed/shell/command_remote_mount_buckets.go b/weed/shell/command_remote_mount_buckets.go new file mode 100644 index 000000000..f76629193 --- /dev/null +++ b/weed/shell/command_remote_mount_buckets.go @@ -0,0 +1,121 @@ +package shell + +import ( + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "path/filepath" + "regexp" +) + +func init() { + Commands = append(Commands, &commandRemoteMountBuckets{}) +} + +type commandRemoteMountBuckets struct { +} + +func (c *commandRemoteMountBuckets) Name() string { + return "remote.mount.buckets" +} + +func (c *commandRemoteMountBuckets) Help() string { + return `mount all buckets in remote storage and pull its metadata + + # assume a remote storage is configured to name "cloud1" + remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + + # mount all buckets + remote.mount.buckets -remote=cloud1 + + # after mount, start a separate process to write updates to remote storage + weed filer.remote.sync -filer=<filerHost>:<filerPort> -createBucketAt=cloud1 + +` +} + +func (c *commandRemoteMountBuckets) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountBucketsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + remote := remoteMountBucketsCommand.String("remote", "", "an already configured storage name") + bucketPattern := remoteMountBucketsCommand.String("bucketPattern", "", "match existing bucket name with wildcard characters '*' and '?'") + trimBucketSuffix := remoteMountBucketsCommand.Bool("trimBucketSuffix", true, "remote suffix auto generated by 'weed filer.remote.sync'") + apply := remoteMountBucketsCommand.Bool("apply", false, "apply the mount for listed buckets") + + if err = remoteMountBucketsCommand.Parse(args); err != nil { + return nil + } + + if *remote == "" { + _, err = listExistingRemoteStorageMounts(commandEnv, writer) + return err + } + + // find configuration for remote storage + remoteConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, *remote) + if err != nil { + return fmt.Errorf("find configuration for %s: %v", *remote, err) + } + + // get storage client + remoteStorageClient, err := remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return fmt.Errorf("get storage client for %s: %v", *remote, err) + } + + buckets, err := remoteStorageClient.ListBuckets() + if err != nil { + return fmt.Errorf("list buckets on %s: %v", *remote, err) + } + + fillerBucketsPath, err := readFilerBucketsPath(commandEnv) + if err != nil { + return fmt.Errorf("read filer buckets path: %v", err) + } + + hasSuffixPattern, _ := regexp.Compile(".+-[0-9][0-9][0-9][0-9]") + + for _, bucket := range buckets { + if *bucketPattern != "" { + if matched, _ := filepath.Match(*bucketPattern, bucket.Name); !matched { + continue + } + } + + fmt.Fprintf(writer, "bucket %s\n", bucket.Name) + localBucketName := bucket.Name + if *trimBucketSuffix { + if hasSuffixPattern.MatchString(localBucketName) { + localBucketName = localBucketName[:len(localBucketName)-5] + fmt.Fprintf(writer, " mount bucket %s as %s\n", bucket.Name, localBucketName) + } + } + if *apply { + + dir := util.FullPath(fillerBucketsPath).Child(localBucketName) + remoteStorageLocation := &remote_pb.RemoteStorageLocation{ + Name: *remote, + Bucket: bucket.Name, + Path: "/", + } + + // sync metadata from remote + if err = syncMetadata(commandEnv, writer, string(dir), true, remoteConf, remoteStorageLocation); err != nil { + return fmt.Errorf("pull metadata on %+v: %v", remoteStorageLocation, err) + } + + // store a mount configuration in filer + if err = filer.InsertMountMapping(commandEnv, string(dir), remoteStorageLocation); err != nil { + return fmt.Errorf("save mount mapping %s to %+v: %v", dir, remoteStorageLocation, err) + } + + } + } + + return nil +} diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go index 0e5152f78..0d4e0cf7d 100644 --- a/weed/shell/command_remote_uncache.go +++ b/weed/shell/command_remote_uncache.go @@ -33,6 +33,8 @@ func (c *commandRemoteUncache) Help() string { remote.uncache -dir=/xxx/some/sub/dir remote.uncache -dir=/xxx/some/sub/dir -include=*.pdf remote.uncache -dir=/xxx/some/sub/dir -exclude=*.txt + remote.uncache -minSize=1024000 # uncache files larger than 100K + remote.uncache -minAge=3600 # uncache files older than 1 hour ` } @@ -52,27 +54,30 @@ func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer if listErr != nil { return listErr } - if *dir == "" { - jsonPrintln(writer, mappings) - fmt.Fprintln(writer, "need to specify '-dir' option") - return nil - } + if *dir != "" { + var localMountedDir string + for k := range mappings.Mappings { + if strings.HasPrefix(*dir, k) { + localMountedDir = k + } + } + if localMountedDir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintf(writer, "%s is not mounted\n", *dir) + return nil + } - var localMountedDir string - for k := range mappings.Mappings { - if strings.HasPrefix(*dir, k) { - localMountedDir = k + // pull content from remote + if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir), fileFiler); err != nil { + return fmt.Errorf("uncache content data: %v", err) } - } - if localMountedDir == "" { - jsonPrintln(writer, mappings) - fmt.Fprintf(writer, "%s is not mounted\n", *dir) return nil } - // pull content from remote - if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir), fileFiler); err != nil { - return fmt.Errorf("cache content data: %v", err) + for key, _ := range mappings.Mappings { + if err := c.uncacheContentData(commandEnv, writer, util.FullPath(key), fileFiler); err != nil { + return err + } } return nil @@ -81,11 +86,12 @@ func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer io.Writer, dirToCache util.FullPath, fileFilter *FileFilter) error { return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + if !mayHaveCachedToLocal(entry) { return true // true means recursive traversal should continue } - if fileFilter.matches(entry) { + if !fileFilter.matches(entry) { return true } @@ -96,7 +102,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer entry.RemoteEntry.LastLocalSyncTsNs = 0 entry.Chunks = nil - println(dir, entry.Name) + fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name)) err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ @@ -109,6 +115,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer fmt.Fprintf(writer, "uncache %+v: %v\n", dir.Child(entry.Name), err) return false } + fmt.Fprintf(writer, "Done\n") return true }) @@ -137,12 +144,12 @@ func newFileFilter(remoteMountCommand *flag.FlagSet) (ff *FileFilter) { func (ff *FileFilter) matches(entry *filer_pb.Entry) bool { if *ff.include != "" { if ok, _ := filepath.Match(*ff.include, entry.Name); !ok { - return true + return false } } if *ff.exclude != "" { if ok, _ := filepath.Match(*ff.exclude, entry.Name); ok { - return true + return false } } if *ff.minSize != -1 { @@ -165,5 +172,5 @@ func (ff *FileFilter) matches(entry *filer_pb.Entry) bool { return false } } - return false + return true } diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go index b65d125aa..04fd5e388 100644 --- a/weed/shell/command_remote_unmount.go +++ b/weed/shell/command_remote_unmount.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "io" + "time" ) func init() { @@ -58,25 +60,26 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("directory %s is not mounted", *dir) } + // store a mount configuration in filer + fmt.Fprintf(writer, "deleting mount for %s ...\n", *dir) + if err = filer.DeleteMountMapping(commandEnv, *dir); err != nil { + return fmt.Errorf("delete mount mapping: %v", err) + } + // purge mounted data + fmt.Fprintf(writer, "purge %s ...\n", *dir) if err = c.purgeMountedData(commandEnv, *dir); err != nil { return fmt.Errorf("purge mounted data: %v", err) } - // store a mount configuration in filer - if err = c.deleteMountMapping(commandEnv, *dir); err != nil { - return fmt.Errorf("delete mount mapping: %v", err) + // reset remote sync offset in case the folder is mounted again + if err = remote_storage.SetSyncOffset(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, *dir, time.Now().UnixNano()); err != nil { + return fmt.Errorf("reset remote.sync offset for %s: %v", *dir, err) } return nil } -func (c *commandRemoteUnmount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { - - return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) - -} - func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error { // find existing directory, and ensure the directory is empty @@ -100,6 +103,8 @@ func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir stri mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) { entry.Attributes = oldEntry.Attributes entry.Extended = oldEntry.Extended + entry.Attributes.Crtime = time.Now().Unix() + entry.Attributes.Mtime = time.Now().Unix() }) if mkdirErr != nil { return fmt.Errorf("mkdir %s: %v", dir, mkdirErr) @@ -114,33 +119,3 @@ func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir stri return nil } -func (c *commandRemoteUnmount) deleteMountMapping(commandEnv *CommandEnv, dir string) (err error) { - - // read current mapping - var oldContent, newContent []byte - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE) - return err - }) - if err != nil { - if err != filer_pb.ErrNotFound { - return fmt.Errorf("read existing mapping: %v", err) - } - } - - // add new mapping - newContent, err = filer.RemoveRemoteStorageMapping(oldContent, dir) - if err != nil { - return fmt.Errorf("delete mount %s: %v", dir, err) - } - - // save back - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent) - }) - if err != nil { - return fmt.Errorf("save mapping: %v", err) - } - - return nil -} diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 20d004c6b..efd5ae5de 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -29,7 +29,7 @@ func (c *commandVolumeFixReplication) Name() string { } func (c *commandVolumeFixReplication) Help() string { - return `add replicas to volumes that are missing replicas + return `add or remove replicas to volumes that are missing replicas or over-replicated This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop. diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 5497e89cc..aef71b419 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -98,7 +98,7 @@ var _ = filer_pb.FilerClient(&CommandEnv{}) func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - filerGrpcAddress := fmt.Sprintf("%s:%d", ce.option.FilerHost, ce.option.FilerPort+10000) + filerGrpcAddress := util.JoinHostPort(ce.option.FilerHost, int(ce.option.FilerPort+10000)) return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn) } diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 765b0efda..db9e815ff 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -1,7 +1,9 @@ package shell import ( + "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util/grace" "io" "os" @@ -45,6 +47,24 @@ func RunShell(options ShellOptions) { go commandEnv.MasterClient.KeepConnectedToMaster() commandEnv.MasterClient.WaitUntilConnected() + if commandEnv.option.FilerAddress != "" { + commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + if resp.ClusterId != "" { + fmt.Printf(` +--- +Free Monitoring Data URL: +https://cloud.seaweedfs.com/ui/%s +--- +`, resp.ClusterId) + } + return nil + }) + } + for { cmd, err := line.Prompt("> ") if err != nil { @@ -64,10 +84,12 @@ func RunShell(options ShellOptions) { func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool { cmds := reg.FindAllString(cmd, -1) + + line.AppendHistory(cmd) + if len(cmds) == 0 { return false } else { - line.AppendHistory(cmd) args := make([]string, len(cmds[1:])) |
