aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
authorBl1tz23 <alex3angle@gmail.com>2021-08-10 13:45:24 +0300
committerBl1tz23 <alex3angle@gmail.com>2021-08-10 13:45:24 +0300
commit1c94b3d01340baad000188550fcf2ccab6ca80e5 (patch)
tree12c3da17eb2d1a43fef78021a3d7c79110b0ff5f /weed/shell
parente6e57db530217ff57b3622b4672b03ebb6313e96 (diff)
parentf9cf9b93d32a2b01bc4d95ce7d24d86ef60be668 (diff)
downloadseaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.tar.xz
seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.zip
merge master, resolve conflicts
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_collection_list.go54
-rw-r--r--weed/shell/command_ec_decode.go15
-rw-r--r--weed/shell/command_fs_configure.go52
-rw-r--r--weed/shell/command_fs_mkdir.go57
-rw-r--r--weed/shell/command_fs_rm.go100
-rw-r--r--weed/shell/command_remote_cache.go151
-rw-r--r--weed/shell/command_remote_configure.go153
-rw-r--r--weed/shell/command_remote_mount.go249
-rw-r--r--weed/shell/command_remote_uncache.go99
-rw-r--r--weed/shell/command_remote_unmount.go146
-rw-r--r--weed/shell/command_s3_bucket_delete.go13
-rw-r--r--weed/shell/command_s3_configure.go2
-rw-r--r--weed/shell/command_volume_balance.go14
-rw-r--r--weed/shell/command_volume_balance_test.go13
-rw-r--r--weed/shell/command_volume_check_disk.go64
-rw-r--r--weed/shell/command_volume_delete_empty.go74
-rw-r--r--weed/shell/command_volume_fix_replication.go101
-rw-r--r--weed/shell/command_volume_fsck.go195
-rw-r--r--weed/shell/command_volume_list_test.go2
-rw-r--r--weed/shell/command_volume_move.go10
-rw-r--r--weed/shell/command_volume_server_evacuate.go9
-rw-r--r--weed/shell/command_volume_tier_move.go90
-rw-r--r--weed/shell/commands.go9
-rw-r--r--weed/shell/shell_liner.go7
24 files changed, 1521 insertions, 158 deletions
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index 2a114e61b..ba502a6b9 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -22,6 +22,14 @@ func (c *commandCollectionList) Help() string {
return `list all collections`
}
+type CollectionInfo struct {
+ FileCount uint64
+ DeleteCount uint64
+ DeletedByteCount uint64
+ Size uint64
+ VolumeCount int
+}
+
func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
collections, err := ListCollectionNames(commandEnv, true, true)
@@ -30,8 +38,21 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
return err
}
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
+ if err != nil {
+ return err
+ }
+
+ collectionInfos := make(map[string]*CollectionInfo)
+
+ writeCollectionInfo(writer, topologyInfo, collectionInfos)
+
for _, c := range collections {
- fmt.Fprintf(writer, "collection:\"%s\"\n", c)
+ cif, found := collectionInfos[c]
+ if !found {
+ continue
+ }
+ fmt.Fprintf(writer, "collection:\"%s\"\tvolumeCount:%d\tsize:%d\tfileCount:%d\tdeletedBytes:%d\tdeletion:%d\n", c, cif.VolumeCount, cif.Size, cif.FileCount, cif.DeletedByteCount, cif.DeleteCount)
}
fmt.Fprintf(writer, "Total %d collections.\n", len(collections))
@@ -56,3 +77,34 @@ func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEc
}
return
}
+
+func addToCollection(collectionInfos map[string]*CollectionInfo, vif *master_pb.VolumeInformationMessage) {
+ c := vif.Collection
+ cif, found := collectionInfos[c]
+ if !found {
+ cif = &CollectionInfo{}
+ collectionInfos[c] = cif
+ }
+ cif.Size += vif.Size
+ cif.DeleteCount += vif.DeleteCount
+ cif.FileCount += vif.FileCount
+ cif.DeletedByteCount += vif.DeletedByteCount
+ cif.VolumeCount++
+}
+
+func writeCollectionInfo(writer io.Writer, t *master_pb.TopologyInfo, collectionInfos map[string]*CollectionInfo) {
+ for _, dc := range t.DataCenterInfos {
+ for _, r := range dc.RackInfos {
+ for _, dn := range r.DataNodeInfos {
+ for _, diskInfo := range dn.DiskInfos {
+ for _, vi := range diskInfo.VolumeInfos {
+ addToCollection(collectionInfos, vi)
+ }
+ //for _, ecShardInfo := range diskInfo.EcShardInfos {
+ //
+ //}
+ }
+ }
+ }
+ }
+}
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index dafdb041a..e4d597d84 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -223,21 +223,6 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn
}
-func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
-
- eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
- if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
- for _, v := range diskInfo.EcShardInfos {
- if v.Collection == selectedCollection && v.Id == uint32(vid) {
- ecShardInfos = append(ecShardInfos, v)
- }
- }
- }
- })
-
- return
-}
-
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
vidMap := make(map[uint32]bool)
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index 02cd7ac69..0aae51d74 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -30,17 +30,17 @@ func (c *commandFsConfigure) Help() string {
fs.configure
# trying the changes and see the possible configuration file content
- fs.configure -locationPrfix=/my/folder -collection=abc
- fs.configure -locationPrfix=/my/folder -collection=abc -ttl=7d
+ fs.configure -locationPrefix=/my/folder -collection=abc
+ fs.configure -locationPrefix=/my/folder -collection=abc -ttl=7d
# example: configure adding only 1 physical volume for each bucket collection
- fs.configure -locationPrfix=/buckets/ -volumeGrowthCount=1
+ fs.configure -locationPrefix=/buckets/ -volumeGrowthCount=1
# apply the changes
- fs.configure -locationPrfix=/my/folder -collection=abc -apply
+ fs.configure -locationPrefix=/my/folder -collection=abc -apply
# delete the changes
- fs.configure -locationPrfix=/my/folder -delete -apply
+ fs.configure -locationPrefix=/my/folder -delete -apply
`
}
@@ -54,6 +54,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl")
diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes")
+ isReadOnly := fsConfigureCommand.Bool("readOnly", false, "disable writes")
volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes")
isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix")
apply := fsConfigureCommand.Bool("apply", false, "update and apply filer configuration")
@@ -61,20 +62,11 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
return nil
}
- var buf bytes.Buffer
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- return filer.ReadEntry(commandEnv.MasterClient, client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, &buf)
- }); err != nil && err != filer_pb.ErrNotFound {
+ fc, err := readFilerConf(commandEnv)
+ if err != nil {
return err
}
- fc := filer.NewFilerConf()
- if buf.Len() > 0 {
- if err = fc.LoadFromBytes(buf.Bytes()); err != nil {
- return err
- }
- }
-
if *locationPrefix != "" {
locConf := &filer_pb.FilerConf_PathConf{
LocationPrefix: *locationPrefix,
@@ -84,6 +76,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
Fsync: *fsync,
DiskType: *diskType,
VolumeGrowthCount: uint32(*volumeGrowthCount),
+ ReadOnly: *isReadOnly,
}
// check collection
@@ -110,15 +103,17 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
}
}
- buf.Reset()
- fc.ToText(&buf)
+ var buf2 bytes.Buffer
+ fc.ToText(&buf2)
- fmt.Fprintf(writer, string(buf.Bytes()))
+ fmt.Fprintf(writer, string(buf2.Bytes()))
fmt.Fprintln(writer)
if *apply {
- if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.DirectoryEtcSeaweedFS, filer.FilerConfName, "text/plain; charset=utf-8", &buf); err != nil {
+ if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes())
+ }); err != nil && err != filer_pb.ErrNotFound {
return err
}
@@ -127,3 +122,20 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
return nil
}
+
+func readFilerConf(commandEnv *CommandEnv) (*filer.FilerConf, error) {
+ var buf bytes.Buffer
+ if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.ReadEntry(commandEnv.MasterClient, client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, &buf)
+ }); err != nil && err != filer_pb.ErrNotFound {
+ return nil, fmt.Errorf("read %s/%s: %v", filer.DirectoryEtcSeaweedFS, filer.FilerConfName, err)
+ }
+
+ fc := filer.NewFilerConf()
+ if buf.Len() > 0 {
+ if err := fc.LoadFromBytes(buf.Bytes()); err != nil {
+ return nil, fmt.Errorf("parse %s/%s: %v", filer.DirectoryEtcSeaweedFS, filer.FilerConfName, err)
+ }
+ }
+ return fc, nil
+}
diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go
new file mode 100644
index 000000000..11b663eec
--- /dev/null
+++ b/weed/shell/command_fs_mkdir.go
@@ -0,0 +1,57 @@
+package shell
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "os"
+ "time"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsMkdir{})
+}
+
+type commandFsMkdir struct {
+}
+
+func (c *commandFsMkdir) Name() string {
+ return "fs.mkdir"
+}
+
+func (c *commandFsMkdir) Help() string {
+ return `create a directory
+
+ fs.mkdir path/to/dir
+`
+}
+
+func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ dir, name := util.FullPath(path).DirAndName()
+
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ _, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0777 | os.ModeDir),
+ },
+ },
+ })
+ return createErr
+ })
+
+ return
+}
diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go
new file mode 100644
index 000000000..b383366ca
--- /dev/null
+++ b/weed/shell/command_fs_rm.go
@@ -0,0 +1,100 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsRm{})
+}
+
+type commandFsRm struct {
+}
+
+func (c *commandFsRm) Name() string {
+ return "fs.rm"
+}
+
+func (c *commandFsRm) Help() string {
+ return `remove file and directory entries
+
+ fs.rm [-rf] <entry1> <entry2> ...
+
+ fs.rm /dir/file_name1 dir/file_name2
+ fs.rm /dir
+
+ The option "-r" can be recursive.
+ The option "-f" can be ignored by recursive error.
+`
+}
+
+func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ isRecursive := false
+ ignoreRecursiveError := false
+ var entiries []string
+ for _, arg := range args {
+ if !strings.HasPrefix(arg, "-") {
+ entiries = append(entiries, arg)
+ continue
+ }
+ for _, t := range arg {
+ switch t {
+ case 'r':
+ isRecursive = true
+ case 'f':
+ ignoreRecursiveError = true
+ }
+ }
+ }
+ if len(entiries) < 1 {
+ return fmt.Errorf("need to have arguments")
+ }
+
+ commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ for _, entry := range entiries {
+ targetPath, err := commandEnv.parseUrl(entry)
+ if err != nil {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err)
+ continue
+ }
+
+ targetDir, targetName := util.FullPath(targetPath).DirAndName()
+
+ lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: targetDir,
+ Name: targetName,
+ }
+ _, err = filer_pb.LookupEntry(client, lookupRequest)
+ if err != nil {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err)
+ continue
+ }
+
+ request := &filer_pb.DeleteEntryRequest{
+ Directory: targetDir,
+ Name: targetName,
+ IgnoreRecursiveError: ignoreRecursiveError,
+ IsDeleteData: true,
+ IsRecursive: isRecursive,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ }
+ if resp, err := client.DeleteEntry(context.Background(), request); err != nil {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err)
+ } else {
+ if resp.Error != "" {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, resp.Error)
+ }
+ }
+ }
+ return nil
+ })
+
+ return
+}
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go
new file mode 100644
index 000000000..a964e994c
--- /dev/null
+++ b/weed/shell/command_remote_cache.go
@@ -0,0 +1,151 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "strings"
+)
+
+func init() {
+ Commands = append(Commands, &commandRemoteCache{})
+}
+
+type commandRemoteCache struct {
+}
+
+func (c *commandRemoteCache) Name() string {
+ return "remote.cache"
+}
+
+func (c *commandRemoteCache) Help() string {
+ return `cache the file content for mounted directories or files
+
+ # assume a remote storage is configured to name "cloud1"
+ remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
+ # mount and pull one bucket
+ remote.mount -dir=xxx -remote=cloud1/bucket
+
+ # after mount, run one of these command to cache the content of the files
+ remote.cache -dir=xxx
+ remote.cache -dir=xxx/some/sub/dir
+
+`
+}
+
+func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+
+ dir := remoteMountCommand.String("dir", "", "a directory in filer")
+
+ if err = remoteMountCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
+ if listErr != nil {
+ return listErr
+ }
+ if *dir == "" {
+ jsonPrintln(writer, mappings)
+ fmt.Fprintln(writer, "need to specify '-dir' option")
+ return nil
+ }
+
+ var localMountedDir string
+ var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
+ for k, loc := range mappings.Mappings {
+ if strings.HasPrefix(*dir, k) {
+ localMountedDir, remoteStorageMountedLocation = k, loc
+ }
+ }
+ if localMountedDir == "" {
+ jsonPrintln(writer, mappings)
+ fmt.Fprintf(writer, "%s is not mounted\n", *dir)
+ return nil
+ }
+
+ // find remote storage configuration
+ remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name)
+ if err != nil {
+ return err
+ }
+
+ // pull content from remote
+ if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil {
+ return fmt.Errorf("cache content data: %v", err)
+ }
+
+ return nil
+}
+
+func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
+
+ err = filer_pb.ReadDirAllEntries(filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.IsDirectory {
+ if !visitEntry(dirPath, entry) {
+ return nil
+ }
+ subDir := dirPath.Child(entry.Name)
+ if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
+ return err
+ }
+ } else {
+ if !visitEntry(dirPath, entry) {
+ return nil
+ }
+ }
+ return nil
+ })
+ return
+}
+
+func shouldCacheToLocal(entry *filer_pb.Entry) bool {
+ if entry.IsDirectory {
+ return false
+ }
+ if entry.RemoteEntry == nil {
+ return false
+ }
+ if entry.RemoteEntry.LocalMtime == 0 && entry.RemoteEntry.RemoteSize > 0 {
+ return true
+ }
+ return false
+}
+
+func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
+ if entry.IsDirectory {
+ return false
+ }
+ if entry.RemoteEntry == nil {
+ return false
+ }
+ if entry.RemoteEntry.LocalMtime > 0 && len(entry.Chunks) > 0 {
+ return true
+ }
+ return false
+}
+
+func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error {
+
+ return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
+ if !shouldCacheToLocal(entry) {
+ return true // true means recursive traversal should continue
+ }
+
+ println(dir, entry.Name)
+
+ remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
+
+ if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
+ fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err)
+ return false
+ }
+
+ return true
+ })
+}
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
new file mode 100644
index 000000000..7a9ad1f65
--- /dev/null
+++ b/weed/shell/command_remote_configure.go
@@ -0,0 +1,153 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
+ "io"
+ "regexp"
+ "strings"
+)
+
+func init() {
+ Commands = append(Commands, &commandRemoteConfigure{})
+}
+
+type commandRemoteConfigure struct {
+}
+
+func (c *commandRemoteConfigure) Name() string {
+ return "remote.configure"
+}
+
+func (c *commandRemoteConfigure) Help() string {
+ return `remote storage configuration
+
+ # see the current configurations
+ remote.configure
+
+ # set or update a configuration
+ remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
+
+ # delete one configuration
+ remote.configure -delete -name=cloud1
+
+`
+}
+
+var (
+ isAlpha = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9]*$`).MatchString
+)
+
+func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ conf := &filer_pb.RemoteConf{}
+
+ remoteConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ isDelete := remoteConfigureCommand.Bool("delete", false, "delete one remote storage by its name")
+
+ remoteConfigureCommand.StringVar(&conf.Name, "name", "", "a short name to identify the remote storage")
+ remoteConfigureCommand.StringVar(&conf.Type, "type", "s3", "storage type, currently only support s3")
+
+ remoteConfigureCommand.StringVar(&conf.S3AccessKey, "s3.access_key", "", "s3 access key")
+ remoteConfigureCommand.StringVar(&conf.S3SecretKey, "s3.secret_key", "", "s3 secret key")
+ remoteConfigureCommand.StringVar(&conf.S3Region, "s3.region", "us-east-2", "s3 region")
+ remoteConfigureCommand.StringVar(&conf.S3Endpoint, "s3.endpoint", "", "endpoint for s3-compatible local object store")
+
+ if err = remoteConfigureCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if conf.Name == "" {
+ return c.listExistingRemoteStorages(commandEnv, writer)
+ }
+
+ if !isAlpha(conf.Name) {
+ return fmt.Errorf("only letters and numbers allowed in name: %v", conf.Name)
+ }
+
+ if *isDelete {
+ return c.deleteRemoteStorage(commandEnv, writer, conf.Name)
+ }
+
+ return c.saveRemoteStorage(commandEnv, writer, conf)
+
+}
+
+func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandEnv, writer io.Writer) error {
+
+ return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(filer.DirectoryEtcRemote), "", func(entry *filer_pb.Entry, isLast bool) error {
+ if len(entry.Content) == 0 {
+ fmt.Fprintf(writer, "skipping %s\n", entry.Name)
+ return nil
+ }
+ if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
+ return nil
+ }
+ conf := &filer_pb.RemoteConf{}
+
+ if err := proto.Unmarshal(entry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
+ }
+
+ conf.S3SecretKey = ""
+
+ m := jsonpb.Marshaler{
+ EmitDefaults: false,
+ Indent: " ",
+ }
+
+ err := m.Marshal(writer, conf)
+ fmt.Fprintln(writer)
+
+ return err
+ })
+
+}
+
+func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error {
+
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.DeleteEntryRequest{
+ Directory: filer.DirectoryEtcRemote,
+ Name: storageName + filer.REMOTE_STORAGE_CONF_SUFFIX,
+ IgnoreRecursiveError: false,
+ IsDeleteData: true,
+ IsRecursive: true,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ }
+ _, err := client.DeleteEntry(context.Background(), request)
+
+ if err == nil {
+ fmt.Fprintf(writer, "removed: %s\n", storageName)
+ }
+
+ return err
+
+ })
+
+}
+
+func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, writer io.Writer, conf *filer_pb.RemoteConf) error {
+
+ data, err := proto.Marshal(conf)
+ if err != nil {
+ return err
+ }
+
+ if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data)
+ }); err != nil && err != filer_pb.ErrNotFound {
+ return err
+ }
+
+ return nil
+
+}
diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go
new file mode 100644
index 000000000..3b04fec63
--- /dev/null
+++ b/weed/shell/command_remote_mount.go
@@ -0,0 +1,249 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandRemoteMount{})
+}
+
+type commandRemoteMount struct {
+}
+
+func (c *commandRemoteMount) Name() string {
+ return "remote.mount"
+}
+
+func (c *commandRemoteMount) Help() string {
+ return `mount remote storage and pull its metadata
+
+ # assume a remote storage is configured to name "cloud1"
+ remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
+
+ # mount and pull one bucket
+ remote.mount -dir=xxx -remote=cloud1/bucket
+ # mount and pull one directory in the bucket
+ remote.mount -dir=xxx -remote=cloud1/bucket/dir1
+
+ # after mount, start a separate process to write updates to remote storage
+ weed filer.remote.sync -filer=<filerHost>:<filerPort> -dir=xxx
+
+`
+}
+
+func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+
+ dir := remoteMountCommand.String("dir", "", "a directory in filer")
+ nonEmpty := remoteMountCommand.Bool("nonempty", false, "allows the mounting over a non-empty directory")
+ remote := remoteMountCommand.String("remote", "", "a directory in remote storage, ex. <storageName>/<bucket>/path/to/dir")
+
+ if err = remoteMountCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *dir == "" {
+ _, err = listExistingRemoteStorageMounts(commandEnv, writer)
+ return err
+ }
+
+ remoteStorageLocation := remote_storage.ParseLocation(*remote)
+
+ // find configuration for remote storage
+ // remotePath is /<bucket>/path/to/dir
+ remoteConf, err := c.findRemoteStorageConfiguration(commandEnv, writer, remoteStorageLocation)
+ if err != nil {
+ return fmt.Errorf("find configuration for %s: %v", *remote, err)
+ }
+
+ // pull metadata from remote
+ if err = c.pullMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remoteStorageLocation); err != nil {
+ return fmt.Errorf("pull metadata: %v", err)
+ }
+
+ // store a mount configuration in filer
+ if err = c.saveMountMapping(commandEnv, writer, *dir, remoteStorageLocation); err != nil {
+ return fmt.Errorf("save mount mapping: %v", err)
+ }
+
+ return nil
+}
+
+func listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (mappings *filer_pb.RemoteStorageMapping, err error) {
+
+ // read current mapping
+ mappings, err = filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
+ if err != nil {
+ return mappings, err
+ }
+
+ jsonPrintln(writer, mappings)
+
+ return
+
+}
+
+func jsonPrintln(writer io.Writer, message proto.Message) error {
+ m := jsonpb.Marshaler{
+ EmitDefaults: false,
+ Indent: " ",
+ }
+
+ err := m.Marshal(writer, message)
+ fmt.Fprintln(writer)
+ return err
+}
+
+func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) {
+
+ return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name)
+
+}
+
+func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *filer_pb.RemoteConf, remote *filer_pb.RemoteStorageLocation) error {
+
+ // find existing directory, and ensure the directory is empty
+ err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ parent, name := util.FullPath(dir).DirAndName()
+ _, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parent,
+ Name: name,
+ })
+ if lookupErr != nil {
+ return fmt.Errorf("lookup %s: %v", dir, lookupErr)
+ }
+
+ mountToDirIsEmpty := true
+ listErr := filer_pb.SeaweedList(client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ mountToDirIsEmpty = false
+ return nil
+ }, "", false, 1)
+
+ if listErr != nil {
+ return fmt.Errorf("list %s: %v", dir, listErr)
+ }
+
+ if !mountToDirIsEmpty {
+ if !nonEmpty {
+ return fmt.Errorf("dir %s is not empty", dir)
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ // visit remote storage
+ remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
+ if err != nil {
+ return err
+ }
+
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ ctx := context.Background()
+ err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
+ localDir := dir + remoteDir
+ println(util.NewFullPath(localDir, name))
+
+ lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: localDir,
+ Name: name,
+ })
+ var existingEntry *filer_pb.Entry
+ if lookupErr != nil {
+ if lookupErr != filer_pb.ErrNotFound {
+ return lookupErr
+ }
+ } else {
+ existingEntry = lookupResponse.Entry
+ }
+
+ if existingEntry == nil {
+ _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
+ Directory: localDir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: isDirectory,
+ Attributes: &filer_pb.FuseAttributes{
+ FileSize: uint64(remoteEntry.RemoteSize),
+ Mtime: remoteEntry.RemoteMtime,
+ FileMode: uint32(0644),
+ },
+ RemoteEntry: remoteEntry,
+ },
+ })
+ return createErr
+ } else {
+ if existingEntry.RemoteEntry == nil || existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag {
+ return doSaveRemoteEntry(client, localDir, existingEntry, remoteEntry)
+ }
+ }
+ return nil
+ })
+ return err
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (c *commandRemoteMount) saveMountMapping(commandEnv *CommandEnv, writer io.Writer, dir string, remoteStorageLocation *filer_pb.RemoteStorageLocation) (err error) {
+
+ // read current mapping
+ var oldContent, newContent []byte
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)
+ return err
+ })
+ if err != nil {
+ if err != filer_pb.ErrNotFound {
+ return fmt.Errorf("read existing mapping: %v", err)
+ }
+ }
+
+ // add new mapping
+ newContent, err = filer.AddRemoteStorageMapping(oldContent, dir, remoteStorageLocation)
+ if err != nil {
+ return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err)
+ }
+
+ // save back
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent)
+ })
+ if err != nil {
+ return fmt.Errorf("save mapping: %v", err)
+ }
+
+ return nil
+}
+
+func doSaveRemoteEntry(client filer_pb.SeaweedFilerClient, localDir string, existingEntry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
+ existingEntry.RemoteEntry = remoteEntry
+ existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)
+ existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime
+ _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
+ Directory: localDir,
+ Entry: existingEntry,
+ })
+ if updateErr != nil {
+ return updateErr
+ }
+ return nil
+}
diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go
new file mode 100644
index 000000000..64cc1472c
--- /dev/null
+++ b/weed/shell/command_remote_uncache.go
@@ -0,0 +1,99 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "strings"
+)
+
+func init() {
+ Commands = append(Commands, &commandRemoteUncache{})
+}
+
+type commandRemoteUncache struct {
+}
+
+func (c *commandRemoteUncache) Name() string {
+ return "remote.uncache"
+}
+
+func (c *commandRemoteUncache) Help() string {
+ return `keep the metadata but remote cache the file content for mounted directories or files
+
+ remote.uncache -dir=xxx
+ remote.uncache -dir=xxx/some/sub/dir
+
+`
+}
+
+func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+
+ dir := remoteMountCommand.String("dir", "", "a directory in filer")
+
+ if err = remoteMountCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
+ if listErr != nil {
+ return listErr
+ }
+ if *dir == "" {
+ jsonPrintln(writer, mappings)
+ fmt.Fprintln(writer, "need to specify '-dir' option")
+ return nil
+ }
+
+ var localMountedDir string
+ for k := range mappings.Mappings {
+ if strings.HasPrefix(*dir, k) {
+ localMountedDir = k
+ }
+ }
+ if localMountedDir == "" {
+ jsonPrintln(writer, mappings)
+ fmt.Fprintf(writer, "%s is not mounted\n", *dir)
+ return nil
+ }
+
+ // pull content from remote
+ if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir)); err != nil {
+ return fmt.Errorf("cache content data: %v", err)
+ }
+
+ return nil
+}
+
+func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer io.Writer, dirToCache util.FullPath) error {
+
+ return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
+ if !mayHaveCachedToLocal(entry) {
+ return true // true means recursive traversal should continue
+ }
+ entry.RemoteEntry.LocalMtime = 0
+ entry.Chunks = nil
+
+ println(dir, entry.Name)
+
+ err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
+ Directory: string(dir),
+ Entry: entry,
+ })
+ return updateErr
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "uncache %+v: %v\n", dir.Child(entry.Name), err)
+ return false
+ }
+
+ return true
+ })
+}
diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go
new file mode 100644
index 000000000..b16da44f1
--- /dev/null
+++ b/weed/shell/command_remote_unmount.go
@@ -0,0 +1,146 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandRemoteUnmount{})
+}
+
+type commandRemoteUnmount struct {
+}
+
+func (c *commandRemoteUnmount) Name() string {
+ return "remote.unmount"
+}
+
+func (c *commandRemoteUnmount) Help() string {
+ return `unmount remote storage
+
+ # assume a remote storage is configured to name "s3_1"
+ remote.configure -name=s3_1 -type=s3 -access_key=xxx -secret_key=yyy
+ # mount and pull one bucket
+ remote.mount -dir=xxx -remote=s3_1/bucket
+
+ # unmount the mounted directory and remove its cache
+ remote.unmount -dir=xxx
+
+`
+}
+
+func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+
+ dir := remoteMountCommand.String("dir", "", "a directory in filer")
+
+ if err = remoteMountCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
+ if listErr != nil {
+ return listErr
+ }
+ if *dir == "" {
+ return jsonPrintln(writer, mappings)
+ }
+
+ _, found := mappings.Mappings[*dir]
+ if !found {
+ return fmt.Errorf("directory %s is not mounted", *dir)
+ }
+
+ // purge mounted data
+ if err = c.purgeMountedData(commandEnv, *dir); err != nil {
+ return fmt.Errorf("purge mounted data: %v", err)
+ }
+
+ // store a mount configuration in filer
+ if err = c.deleteMountMapping(commandEnv, *dir); err != nil {
+ return fmt.Errorf("delete mount mapping: %v", err)
+ }
+
+ return nil
+}
+
+func (c *commandRemoteUnmount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) {
+
+ return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name)
+
+}
+
+func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error {
+
+ // find existing directory, and ensure the directory is empty
+ err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ parent, name := util.FullPath(dir).DirAndName()
+ lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parent,
+ Name: name,
+ })
+ if lookupErr != nil {
+ return fmt.Errorf("lookup %s: %v", dir, lookupErr)
+ }
+
+ oldEntry := lookupResp.Entry
+
+ deleteError := filer_pb.DoRemove(client, parent, name, true, true, true, false, nil)
+ if deleteError != nil {
+ return fmt.Errorf("delete %s: %v", dir, deleteError)
+ }
+
+ mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) {
+ entry.Attributes = oldEntry.Attributes
+ entry.Extended = oldEntry.Extended
+ })
+ if mkdirErr != nil {
+ return fmt.Errorf("mkdir %s: %v", dir, mkdirErr)
+ }
+
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (c *commandRemoteUnmount) deleteMountMapping(commandEnv *CommandEnv, dir string) (err error) {
+
+ // read current mapping
+ var oldContent, newContent []byte
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)
+ return err
+ })
+ if err != nil {
+ if err != filer_pb.ErrNotFound {
+ return fmt.Errorf("read existing mapping: %v", err)
+ }
+ }
+
+ // add new mapping
+ newContent, err = filer.RemoveRemoteStorageMapping(oldContent, dir)
+ if err != nil {
+ return fmt.Errorf("delete mount %s: %v", dir, err)
+ }
+
+ // save back
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent)
+ })
+ if err != nil {
+ return fmt.Errorf("save mapping: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index a8d8c5c29..26953c249 100644
--- a/weed/shell/command_s3_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -1,8 +1,10 @@
package shell
import (
+ "context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"io"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -49,6 +51,17 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("read buckets: %v", err)
}
+ // delete the collection directly first
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ Name: *bucketName,
+ })
+ return err
+ })
+ if err != nil {
+ return
+ }
+
return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil)
}
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index ca51ef72f..5eab2ebd0 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -164,7 +164,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
}
buf.Reset()
- filer.S3ConfigurationToText(&buf, s3cfg)
+ filer.ProtoToText(&buf, s3cfg)
fmt.Fprintf(writer, string(buf.Bytes()))
fmt.Fprintln(writer)
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index ad7da0e44..162b66556 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -120,7 +120,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, vo
func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
- // balance writable volumes
+ // balance read only volumes
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
if collection != "ALL_COLLECTIONS" {
@@ -128,14 +128,14 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT
return false
}
}
- return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit)
+ return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit)
})
}
- if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil {
return err
}
- // balance readable volumes
+ // balance writable volumes
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
if collection != "ALL_COLLECTIONS" {
@@ -143,10 +143,10 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT
return false
}
}
- return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit)
+ return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit)
})
}
- if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil {
return err
}
@@ -340,7 +340,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
- return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType, false)
}
return nil
}
diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go
index b77811f51..9732e9bb7 100644
--- a/weed/shell/command_volume_balance_test.go
+++ b/weed/shell/command_volume_balance_test.go
@@ -1,6 +1,8 @@
package shell
import (
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/stretchr/testify/assert"
"testing"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -181,3 +183,14 @@ func TestBalance(t *testing.T) {
}
}
+
+func TestVolumeSelection(t *testing.T) {
+ topologyInfo := parseOutput(topoData)
+
+ vids, err := collectVolumeIdsForTierChange(nil, topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0)
+ if err != nil {
+ t.Errorf("collectVolumeIdsForTierChange: %v", err)
+ }
+ assert.Equal(t, 378, len(vids))
+
+}
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 0f156ac2f..7e060f3d3 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -89,25 +89,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
continue
}
- // reset index db
- aDB.Close()
- bDB.Close()
- aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb()
-
- // read index db
- if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil {
- return err
- }
- if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil {
- return err
- }
-
- // find and make up the differnces
- if err := c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
- return err
- }
- if err := c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
- return err
+ if err := c.syncTwoReplicas(aDB, bDB, a, verbose, writer, b, err, applyChanges, nonRepairThreshold); err != nil {
+ fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
replicas = replicas[1:]
}
@@ -116,7 +99,34 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
return nil
}
-func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, nonRepairThreshold float64) error {
+func (c *commandVolumeCheckDisk) syncTwoReplicas(aDB *needle_map.MemDb, bDB *needle_map.MemDb, a *VolumeReplica, verbose *bool, writer io.Writer, b *VolumeReplica, err error, applyChanges *bool, nonRepairThreshold *float64) error {
+ aHasChanges, bHasChanges := true, true
+ for aHasChanges || bHasChanges {
+ // reset index db
+ aDB.Close()
+ bDB.Close()
+ aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb()
+
+ // read index db
+ if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil {
+ return err
+ }
+ if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil {
+ return err
+ }
+
+ // find and make up the differences
+ if aHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
+ return err
+ }
+ if bHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, nonRepairThreshold float64) (hasChanges bool, err error) {
// find missing keys
// hash join, can be more efficient
@@ -133,12 +143,12 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles))
if counter == 0 || len(missingNeedles) == 0 {
- return nil
+ return false, nil
}
missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter)
if missingNeedlesFraction > nonRepairThreshold {
- return fmt.Errorf(
+ return false, fmt.Errorf(
"failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f",
source.info.Id, missingNeedlesFraction, nonRepairThreshold)
}
@@ -147,7 +157,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
needleBlob, err := c.readSourceNeedleBlob(source.location.dataNode.Id, source.info.Id, needleValue)
if err != nil {
- return err
+ return hasChanges, err
}
if !applyChanges {
@@ -158,13 +168,15 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
fmt.Fprintf(writer, "read %d,%x %s => %s \n", source.info.Id, needleValue.Key, source.location.dataNode.Id, target.location.dataNode.Id)
}
- if err := c.writeNeedleBlobToTarget(target.location.dataNode.Id, source.info.Id, needleValue, needleBlob); err != nil {
- return err
+ hasChanges = true
+
+ if err = c.writeNeedleBlobToTarget(target.location.dataNode.Id, source.info.Id, needleValue, needleBlob); err != nil {
+ return hasChanges, err
}
}
- return nil
+ return
}
func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go
new file mode 100644
index 000000000..079915f66
--- /dev/null
+++ b/weed/shell/command_volume_delete_empty.go
@@ -0,0 +1,74 @@
+package shell
+
+import (
+ "flag"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "io"
+ "log"
+ "time"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeDeleteEmpty{})
+}
+
+type commandVolumeDeleteEmpty struct {
+}
+
+func (c *commandVolumeDeleteEmpty) Name() string {
+ return "volume.deleteEmpty"
+}
+
+func (c *commandVolumeDeleteEmpty) Help() string {
+ return `delete empty volumes from all volume servers
+
+ volume.deleteEmpty -quietFor=24h -force
+
+ This command deletes all empty volumes from one volume server.
+
+`
+}
+
+func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ quietPeriod := volDeleteCommand.Duration("quietFor", 24*time.Hour, "select empty volumes with no recent writes, avoid newly created ones")
+ applyBalancing := volDeleteCommand.Bool("force", false, "apply to delete empty volumes")
+ if err = volDeleteCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
+ if err != nil {
+ return err
+ }
+
+ quietSeconds := int64(*quietPeriod / time.Second)
+ nowUnixSeconds := time.Now().Unix()
+
+ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, diskInfo := range dn.DiskInfos {
+ for _, v := range diskInfo.VolumeInfos {
+ if v.Size <= 8 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
+ if *applyBalancing {
+ log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
+ if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), dn.Id); deleteErr != nil {
+ err = deleteErr
+ }
+ continue
+ } else {
+ log.Printf("empty volume %d from %s", v.Id, dn.Id)
+ }
+ }
+ }
+ }
+ })
+
+ return
+}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 538351fd0..326cb2a40 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -58,6 +58,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
+ retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
if err = volFixReplicationCommand.Parse(args); err != nil {
return nil
}
@@ -100,7 +101,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
// find the most under populated data nodes
- return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
+ return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
}
@@ -154,64 +155,74 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) {
for _, vid := range underReplicatedVolumeIds {
- replicas := volumeReplicas[vid]
- replica := pickOneReplicaToCopyFrom(replicas)
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
- foundNewLocation := false
- hasSkippedCollection := false
- keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
- fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
- for _, dst := range allLocations {
- // check whether data nodes satisfy the constraints
- if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
- // check collection name pattern
- if *c.collectionPattern != "" {
- matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
- if err != nil {
- return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
- }
- if !matched {
- hasSkippedCollection = true
- break
- }
- }
-
- // ask the volume server to replicate the volume
- foundNewLocation = true
- fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
+ for i := 0; i < retryCount+1; i++ {
+ if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
+ break
+ }
+ }
+ }
+ return
+}
- if !takeAction {
+func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
+ replicas := volumeReplicas[vid]
+ replica := pickOneReplicaToCopyFrom(replicas)
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ foundNewLocation := false
+ hasSkippedCollection := false
+ keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
+ fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
+ for _, dst := range allLocations {
+ // check whether data nodes satisfy the constraints
+ if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
+ // check collection name pattern
+ if *c.collectionPattern != "" {
+ matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
+ if err != nil {
+ return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
+ }
+ if !matched {
+ hasSkippedCollection = true
break
}
+ }
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
- VolumeId: replica.info.Id,
- SourceDataNode: replica.location.dataNode.Id,
- })
- if replicateErr != nil {
- return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
- }
- return nil
- })
-
- if err != nil {
- return err
- }
+ // ask the volume server to replicate the volume
+ foundNewLocation = true
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
+ if !takeAction {
// adjust free volume count
dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
break
}
- }
- if !foundNewLocation && !hasSkippedCollection {
- fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ VolumeId: replica.info.Id,
+ SourceDataNode: replica.location.dataNode.Id,
+ })
+ if replicateErr != nil {
+ return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // adjust free volume count
+ dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
+ break
}
+ }
+ if !foundNewLocation && !hasSkippedCollection {
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}
return nil
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 1fbc9ad35..27c253209 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -1,9 +1,11 @@
package shell
import (
+ "bufio"
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
"io/ioutil"
"math"
@@ -44,6 +46,12 @@ func (c *commandVolumeFsck) Help() string {
2. collect all file ids from the filer, as set B
3. find out the set A subtract B
+ If -findMissingChunksInFiler is enabled, this works
+ in a reverse way:
+ 1. collect all file ids from all volumes, as set A
+ 2. collect all file ids from the filer, as set B
+ 3. find out the set B subtract A
+
`
}
@@ -55,6 +63,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
verbose := fsckCommand.Bool("v", false, "verbose mode")
+ findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
+ findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -86,15 +96,108 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
}
}
- // collect all filer file ids
- if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
- return fmt.Errorf("failed to collect file ids from filer: %v", err)
+ if *findMissingChunksInFiler {
+ // collect all filer file ids and paths
+ if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, applyPurging); err != nil {
+ return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
+ }
+ // for each volume, check filer file ids
+ if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
+ return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
+ }
+ } else {
+ // collect all filer file ids
+ if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
+ return fmt.Errorf("failed to collect file ids from filer: %v", err)
+ }
+ // volume file ids substract filer file ids
+ if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
+ return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
+ }
+ }
+
+ return nil
+}
+
+func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, applyPurging *bool) error {
+
+ if verbose {
+ fmt.Fprintf(writer, "checking each file from filer ...\n")
+ }
+
+ files := make(map[uint32]*os.File)
+ for vid := range volumeIdToServer {
+ dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
+ }
+ files[vid] = dst
}
+ defer func() {
+ for _, f := range files {
+ f.Close()
+ }
+ }()
+
+ type Item struct {
+ vid uint32
+ fileKey uint64
+ cookie uint32
+ path util.FullPath
+ }
+ return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(outputChan chan interface{}) {
+ buffer := make([]byte, 16)
+ for item := range outputChan {
+ i := item.(*Item)
+ if f, ok := files[i.vid]; ok {
+ util.Uint64toBytes(buffer, i.fileKey)
+ util.Uint32toBytes(buffer[8:], i.cookie)
+ util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
+ f.Write(buffer)
+ f.Write([]byte(i.path))
+ // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
+ } else {
+ fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
+ }
+ }
+ }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ if verbose && entry.Entry.IsDirectory {
+ fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
+ }
+ dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
+ if resolveErr != nil {
+ return nil
+ }
+ dChunks = append(dChunks, mChunks...)
+ for _, chunk := range dChunks {
+ outputChan <- &Item{
+ vid: chunk.Fid.VolumeId,
+ fileKey: chunk.Fid.FileKey,
+ cookie: chunk.Fid.Cookie,
+ path: util.NewFullPath(entry.Dir, entry.Entry.Name),
+ }
+ }
+ return nil
+ })
- // volume file ids substract filer file ids
+ return nil
+}
+
+func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
+
+ for volumeId, vinfo := range volumeIdToVInfo {
+ checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose)
+ if checkErr != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
+ }
+ }
+ return nil
+}
+
+func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
for volumeId, vinfo := range volumeIdToVInfo {
- inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose)
+ inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
@@ -102,9 +205,9 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
totalOrphanChunkCount += uint64(len(orphanFileIds))
totalOrphanDataSize += orphanDataSize
- if *verbose {
+ if verbose {
for _, fid := range orphanFileIds {
- fmt.Fprintf(writer, "%sxxxxxxxx\n", fid)
+ fmt.Fprintf(writer, "%s\n", fid)
}
}
@@ -112,8 +215,14 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
if vinfo.isEcVolume {
fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n")
}
- if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
- return fmt.Errorf("purge for volume %d: %v\n", volumeId, err)
+ if inUseCount == 0 {
+ if err := deleteVolume(c.env.option.GrpcDialOption, needle.VolumeId(volumeId), vinfo.server); err != nil {
+ return fmt.Errorf("delete volume %d: %v\n", volumeId, err)
+ }
+ } else {
+ if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
+ return fmt.Errorf("purge for volume %d: %v\n", volumeId, err)
+ }
}
}
}
@@ -130,7 +239,6 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
}
-
return nil
}
@@ -203,7 +311,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
files[i.vid].Write(buffer)
}
}, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
+ dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
return nil
}
@@ -218,6 +326,69 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
})
}
+func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) {
+
+ if verbose {
+ fmt.Fprintf(writer, "find missing file chuns in volume %d ...\n", volumeId)
+ }
+
+ db := needle_map.NewMemDb()
+ defer db.Close()
+
+ if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
+ return
+ }
+
+ file := getFilerFileIdFile(tempFolder, volumeId)
+ fp, err := os.Open(file)
+ if err != nil {
+ return
+ }
+ defer fp.Close()
+
+ type Item struct {
+ fileKey uint64
+ cookie uint32
+ path util.FullPath
+ }
+
+ br := bufio.NewReader(fp)
+ buffer := make([]byte, 16)
+ item := &Item{}
+ var readSize int
+ for {
+ readSize, err = io.ReadFull(br, buffer)
+ if err != nil || readSize != 16 {
+ if err == io.EOF {
+ return nil
+ } else {
+ break
+ }
+ }
+
+ item.fileKey = util.BytesToUint64(buffer[:8])
+ item.cookie = util.BytesToUint32(buffer[8:12])
+ pathSize := util.BytesToUint32(buffer[12:16])
+ pathBytes := make([]byte, int(pathSize))
+ n, err := io.ReadFull(br, pathBytes)
+ if err != nil {
+ fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
+ }
+ if n != int(pathSize) {
+ fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
+ }
+ item.path = util.FullPath(string(pathBytes))
+
+ if _, found := db.Get(types.NeedleId(item.fileKey)); !found {
+ fmt.Fprintf(writer, "%d,%x%08x in %s %d not found\n", volumeId, item.fileKey, item.cookie, item.path, pathSize)
+ }
+
+ }
+
+ return
+
+}
+
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
db := needle_map.NewMemDb()
@@ -246,7 +417,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri
var orphanFileCount uint64
db.AscendingVisit(func(n needle_map.NeedleValue) error {
// fmt.Printf("%d,%x\n", volumeId, n.Key)
- orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String()))
+ orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
orphanFileCount++
orphanDataSize += uint64(n.Size)
return nil
diff --git a/weed/shell/command_volume_list_test.go b/weed/shell/command_volume_list_test.go
index 72c76f242..379cf4943 100644
--- a/weed/shell/command_volume_list_test.go
+++ b/weed/shell/command_volume_list_test.go
@@ -68,7 +68,7 @@ func parseOutput(output string) *master_pb.TopologyInfo {
maxVolumeCount, _ := strconv.Atoi(maxVolumeCountStr)
disk = &master_pb.DiskInfo{
Type: diskType,
- MaxVolumeCount: uint64(maxVolumeCount),
+ MaxVolumeCount: int64(maxVolumeCount),
}
dn.DiskInfos[types.ToDiskType(diskType).String()] = disk
} else {
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 84f33db34..666e3e867 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -69,11 +69,11 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, false)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
-func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string) (err error) {
+func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
@@ -83,7 +83,11 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so
log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
- return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
+ if skipTailError {
+ fmt.Fprintf(writer, "tail volume %d from %s to %s: %v\n", volumeId, sourceVolumeServer, targetVolumeServer, err)
+ } else {
+ return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
+ }
}
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index f21d0334c..6e0c19ae1 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -52,6 +52,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
+ retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
if err = vsEvacuateCommand.Parse(args); err != nil {
return nil
}
@@ -60,7 +61,13 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
}
- return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer)
+ for i := 0; i < *retryCount+1; i++ {
+ if err = volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
+ return nil
+ }
+ }
+
+ return
}
diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go
index d6a49d6e1..d370d93e4 100644
--- a/weed/shell/command_volume_tier_move.go
+++ b/weed/shell/command_volume_tier_move.go
@@ -7,6 +7,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
+ "path/filepath"
+ "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -17,6 +19,9 @@ func init() {
}
type commandVolumeTierMove struct {
+ activeServers map[string]struct{}
+ activeServersLock sync.Mutex
+ activeServersCond *sync.Cond
}
func (c *commandVolumeTierMove) Name() string {
@@ -26,7 +31,7 @@ func (c *commandVolumeTierMove) Name() string {
func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another
- volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h]
+ volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h]
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
So "volume.fix.replication" and "volume.balance" should be followed.
@@ -36,12 +41,15 @@ func (c *commandVolumeTierMove) Help() string {
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ c.activeServers = make(map[string]struct{})
+ c.activeServersCond = sync.NewCond(new(sync.Mutex))
+
if err = commandEnv.confirmIsLocked(); err != nil {
return
}
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- collection := tierCommand.String("collection", "", "the collection name")
+ collectionPattern := tierCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
source := tierCommand.String("fromDiskType", "", "the source disk type")
@@ -65,7 +73,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
}
// collect all volumes that should change
- volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
@@ -73,7 +81,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
_, allLocations := collectVolumeReplicaLocations(topologyInfo)
for _, vid := range volumeIds {
- if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil {
+ if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations, *applyChange); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err)
}
}
@@ -90,7 +98,7 @@ func isOneOf(server string, locations []wdclient.Location) bool {
return false
}
-func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
+func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@@ -120,25 +128,32 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin
hasFoundTarget = true
if !applyChanges {
+ // adjust volume count
+ dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++
break
}
- // mark all replicas as read only
- if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil {
- return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
- }
- if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString()); err != nil {
- return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
+ c.activeServersCond.L.Lock()
+ _, isSourceActive := c.activeServers[sourceVolumeServer]
+ _, isDestActive := c.activeServers[dst.dataNode.Id]
+ for isSourceActive || isDestActive {
+ c.activeServersCond.Wait()
+ _, isSourceActive = c.activeServers[sourceVolumeServer]
+ _, isDestActive = c.activeServers[dst.dataNode.Id]
}
+ c.activeServers[sourceVolumeServer] = struct{}{}
+ c.activeServers[dst.dataNode.Id] = struct{}{}
+ c.activeServersCond.L.Unlock()
- // remove the remaining replicas
- for _, loc := range locations {
- if loc.Url != dst.dataNode.Id {
- if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
- fmt.Fprintf(writer, "failed to delete volume %d on %s\n", vid, loc.Url)
- }
+ go func(dst location) {
+ if err := c.doMoveOneVolume(commandEnv, writer, vid, toDiskType, locations, sourceVolumeServer, dst); err != nil {
+ fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", vid, sourceVolumeServer, dst.dataNode.Id, err)
}
- }
+ delete(c.activeServers, sourceVolumeServer)
+ delete(c.activeServers, dst.dataNode.Id)
+ c.activeServersCond.Signal()
+ }(dst)
+
}
}
@@ -149,7 +164,31 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin
return nil
}
-func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) {
+
+ // mark all replicas as read only
+ if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil {
+ return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
+ }
+ if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil {
+ return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
+ }
+
+ // adjust volume count
+ dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++
+
+ // remove the remaining replicas
+ for _, loc := range locations {
+ if loc.Url != dst.dataNode.Id && loc.Url != sourceVolumeServer {
+ if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
+ fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
+ }
+ }
+ }
+ return nil
+}
+
+func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
@@ -160,7 +199,18 @@ func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
- if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
+ // check collection name pattern
+ if collectionPattern != "" {
+ matched, err := filepath.Match(collectionPattern, v.Collection)
+ if err != nil {
+ return
+ }
+ if !matched {
+ continue
+ }
+ }
+
+ if v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true
}
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 0e285214b..5497e89cc 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -20,9 +20,10 @@ type ShellOptions struct {
Masters *string
GrpcDialOption grpc.DialOption
// shell transient context
- FilerHost string
- FilerPort int64
- Directory string
+ FilerHost string
+ FilerPort int64
+ FilerAddress string
+ Directory string
}
type CommandEnv struct {
@@ -75,7 +76,7 @@ func (ce *CommandEnv) confirmIsLocked() error {
return nil
}
- return fmt.Errorf("need to lock to continue")
+ return fmt.Errorf("need to run \"lock\" first to continue")
}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index 1dd611ca5..765b0efda 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -31,6 +31,7 @@ func RunShell(options ShellOptions) {
})
line.SetCtrlCAborts(true)
+ line.SetTabCompletionStyle(liner.TabPrints)
setCompletionHandler()
loadHistory()
@@ -147,9 +148,11 @@ func loadHistory() {
func saveHistory() {
if f, err := os.Create(historyPath); err != nil {
- fmt.Printf("Error writing history file: %v\n", err)
+ fmt.Printf("Error creating history file: %v\n", err)
} else {
- line.WriteHistory(f)
+ if _, err = line.WriteHistory(f); err != nil {
+ fmt.Printf("Error writing history file: %v\n", err)
+ }
f.Close()
}
}