diff options
| author | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
|---|---|---|
| committer | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
| commit | 1c94b3d01340baad000188550fcf2ccab6ca80e5 (patch) | |
| tree | 12c3da17eb2d1a43fef78021a3d7c79110b0ff5f /weed/shell | |
| parent | e6e57db530217ff57b3622b4672b03ebb6313e96 (diff) | |
| parent | f9cf9b93d32a2b01bc4d95ce7d24d86ef60be668 (diff) | |
| download | seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.tar.xz seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.zip | |
merge master, resolve conflicts
Diffstat (limited to 'weed/shell')
24 files changed, 1521 insertions, 158 deletions
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index 2a114e61b..ba502a6b9 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -22,6 +22,14 @@ func (c *commandCollectionList) Help() string { return `list all collections` } +type CollectionInfo struct { + FileCount uint64 + DeleteCount uint64 + DeletedByteCount uint64 + Size uint64 + VolumeCount int +} + func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { collections, err := ListCollectionNames(commandEnv, true, true) @@ -30,8 +38,21 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer return err } + topologyInfo, _, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } + + collectionInfos := make(map[string]*CollectionInfo) + + writeCollectionInfo(writer, topologyInfo, collectionInfos) + for _, c := range collections { - fmt.Fprintf(writer, "collection:\"%s\"\n", c) + cif, found := collectionInfos[c] + if !found { + continue + } + fmt.Fprintf(writer, "collection:\"%s\"\tvolumeCount:%d\tsize:%d\tfileCount:%d\tdeletedBytes:%d\tdeletion:%d\n", c, cif.VolumeCount, cif.Size, cif.FileCount, cif.DeletedByteCount, cif.DeleteCount) } fmt.Fprintf(writer, "Total %d collections.\n", len(collections)) @@ -56,3 +77,34 @@ func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEc } return } + +func addToCollection(collectionInfos map[string]*CollectionInfo, vif *master_pb.VolumeInformationMessage) { + c := vif.Collection + cif, found := collectionInfos[c] + if !found { + cif = &CollectionInfo{} + collectionInfos[c] = cif + } + cif.Size += vif.Size + cif.DeleteCount += vif.DeleteCount + cif.FileCount += vif.FileCount + cif.DeletedByteCount += vif.DeletedByteCount + cif.VolumeCount++ +} + +func writeCollectionInfo(writer io.Writer, t *master_pb.TopologyInfo, collectionInfos map[string]*CollectionInfo) { + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, diskInfo := range dn.DiskInfos { + for _, vi := range diskInfo.VolumeInfos { + addToCollection(collectionInfos, vi) + } + //for _, ecShardInfo := range diskInfo.EcShardInfos { + // + //} + } + } + } + } +} diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index dafdb041a..e4d597d84 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -223,21 +223,6 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn } -func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) { - - eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { - for _, v := range diskInfo.EcShardInfos { - if v.Collection == selectedCollection && v.Id == uint32(vid) { - ecShardInfos = append(ecShardInfos, v) - } - } - } - }) - - return -} - func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { vidMap := make(map[uint32]bool) diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 02cd7ac69..0aae51d74 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -30,17 +30,17 @@ func (c *commandFsConfigure) Help() string { fs.configure # trying the changes and see the possible configuration file content - fs.configure -locationPrfix=/my/folder -collection=abc - fs.configure -locationPrfix=/my/folder -collection=abc -ttl=7d + fs.configure -locationPrefix=/my/folder -collection=abc + fs.configure -locationPrefix=/my/folder -collection=abc -ttl=7d # example: configure adding only 1 physical volume for each bucket collection - fs.configure -locationPrfix=/buckets/ -volumeGrowthCount=1 + fs.configure -locationPrefix=/buckets/ -volumeGrowthCount=1 # apply the changes - fs.configure -locationPrfix=/my/folder -collection=abc -apply + fs.configure -locationPrefix=/my/folder -collection=abc -apply # delete the changes - fs.configure -locationPrfix=/my/folder -delete -apply + fs.configure -locationPrefix=/my/folder -delete -apply ` } @@ -54,6 +54,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl") diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes") + isReadOnly := fsConfigureCommand.Bool("readOnly", false, "disable writes") volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes") isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") apply := fsConfigureCommand.Bool("apply", false, "update and apply filer configuration") @@ -61,20 +62,11 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io return nil } - var buf bytes.Buffer - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - return filer.ReadEntry(commandEnv.MasterClient, client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, &buf) - }); err != nil && err != filer_pb.ErrNotFound { + fc, err := readFilerConf(commandEnv) + if err != nil { return err } - fc := filer.NewFilerConf() - if buf.Len() > 0 { - if err = fc.LoadFromBytes(buf.Bytes()); err != nil { - return err - } - } - if *locationPrefix != "" { locConf := &filer_pb.FilerConf_PathConf{ LocationPrefix: *locationPrefix, @@ -84,6 +76,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io Fsync: *fsync, DiskType: *diskType, VolumeGrowthCount: uint32(*volumeGrowthCount), + ReadOnly: *isReadOnly, } // check collection @@ -110,15 +103,17 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io } } - buf.Reset() - fc.ToText(&buf) + var buf2 bytes.Buffer + fc.ToText(&buf2) - fmt.Fprintf(writer, string(buf.Bytes())) + fmt.Fprintf(writer, string(buf2.Bytes())) fmt.Fprintln(writer) if *apply { - if err := filer.SaveAs(commandEnv.option.FilerHost, int(commandEnv.option.FilerPort), filer.DirectoryEtcSeaweedFS, filer.FilerConfName, "text/plain; charset=utf-8", &buf); err != nil { + if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes()) + }); err != nil && err != filer_pb.ErrNotFound { return err } @@ -127,3 +122,20 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io return nil } + +func readFilerConf(commandEnv *CommandEnv) (*filer.FilerConf, error) { + var buf bytes.Buffer + if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.ReadEntry(commandEnv.MasterClient, client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, &buf) + }); err != nil && err != filer_pb.ErrNotFound { + return nil, fmt.Errorf("read %s/%s: %v", filer.DirectoryEtcSeaweedFS, filer.FilerConfName, err) + } + + fc := filer.NewFilerConf() + if buf.Len() > 0 { + if err := fc.LoadFromBytes(buf.Bytes()); err != nil { + return nil, fmt.Errorf("parse %s/%s: %v", filer.DirectoryEtcSeaweedFS, filer.FilerConfName, err) + } + } + return fc, nil +} diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go new file mode 100644 index 000000000..11b663eec --- /dev/null +++ b/weed/shell/command_fs_mkdir.go @@ -0,0 +1,57 @@ +package shell + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "os" + "time" +) + +func init() { + Commands = append(Commands, &commandFsMkdir{}) +} + +type commandFsMkdir struct { +} + +func (c *commandFsMkdir) Name() string { + return "fs.mkdir" +} + +func (c *commandFsMkdir) Help() string { + return `create a directory + + fs.mkdir path/to/dir +` +} + +func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + dir, name := util.FullPath(path).DirAndName() + + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + _, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0777 | os.ModeDir), + }, + }, + }) + return createErr + }) + + return +} diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go new file mode 100644 index 000000000..b383366ca --- /dev/null +++ b/weed/shell/command_fs_rm.go @@ -0,0 +1,100 @@ +package shell + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsRm{}) +} + +type commandFsRm struct { +} + +func (c *commandFsRm) Name() string { + return "fs.rm" +} + +func (c *commandFsRm) Help() string { + return `remove file and directory entries + + fs.rm [-rf] <entry1> <entry2> ... + + fs.rm /dir/file_name1 dir/file_name2 + fs.rm /dir + + The option "-r" can be recursive. + The option "-f" can be ignored by recursive error. +` +} + +func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + isRecursive := false + ignoreRecursiveError := false + var entiries []string + for _, arg := range args { + if !strings.HasPrefix(arg, "-") { + entiries = append(entiries, arg) + continue + } + for _, t := range arg { + switch t { + case 'r': + isRecursive = true + case 'f': + ignoreRecursiveError = true + } + } + } + if len(entiries) < 1 { + return fmt.Errorf("need to have arguments") + } + + commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + for _, entry := range entiries { + targetPath, err := commandEnv.parseUrl(entry) + if err != nil { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err) + continue + } + + targetDir, targetName := util.FullPath(targetPath).DirAndName() + + lookupRequest := &filer_pb.LookupDirectoryEntryRequest{ + Directory: targetDir, + Name: targetName, + } + _, err = filer_pb.LookupEntry(client, lookupRequest) + if err != nil { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err) + continue + } + + request := &filer_pb.DeleteEntryRequest{ + Directory: targetDir, + Name: targetName, + IgnoreRecursiveError: ignoreRecursiveError, + IsDeleteData: true, + IsRecursive: isRecursive, + IsFromOtherCluster: false, + Signatures: nil, + } + if resp, err := client.DeleteEntry(context.Background(), request); err != nil { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err) + } else { + if resp.Error != "" { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, resp.Error) + } + } + } + return nil + }) + + return +} diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go new file mode 100644 index 000000000..a964e994c --- /dev/null +++ b/weed/shell/command_remote_cache.go @@ -0,0 +1,151 @@ +package shell + +import ( + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "strings" +) + +func init() { + Commands = append(Commands, &commandRemoteCache{}) +} + +type commandRemoteCache struct { +} + +func (c *commandRemoteCache) Name() string { + return "remote.cache" +} + +func (c *commandRemoteCache) Help() string { + return `cache the file content for mounted directories or files + + # assume a remote storage is configured to name "cloud1" + remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + # mount and pull one bucket + remote.mount -dir=xxx -remote=cloud1/bucket + + # after mount, run one of these command to cache the content of the files + remote.cache -dir=xxx + remote.cache -dir=xxx/some/sub/dir + +` +} + +func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMountCommand.String("dir", "", "a directory in filer") + + if err = remoteMountCommand.Parse(args); err != nil { + return nil + } + + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return listErr + } + if *dir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintln(writer, "need to specify '-dir' option") + return nil + } + + var localMountedDir string + var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + for k, loc := range mappings.Mappings { + if strings.HasPrefix(*dir, k) { + localMountedDir, remoteStorageMountedLocation = k, loc + } + } + if localMountedDir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintf(writer, "%s is not mounted\n", *dir) + return nil + } + + // find remote storage configuration + remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name) + if err != nil { + return err + } + + // pull content from remote + if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil { + return fmt.Errorf("cache content data: %v", err) + } + + return nil +} + +func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) { + + err = filer_pb.ReadDirAllEntries(filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + if !visitEntry(dirPath, entry) { + return nil + } + subDir := dirPath.Child(entry.Name) + if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil { + return err + } + } else { + if !visitEntry(dirPath, entry) { + return nil + } + } + return nil + }) + return +} + +func shouldCacheToLocal(entry *filer_pb.Entry) bool { + if entry.IsDirectory { + return false + } + if entry.RemoteEntry == nil { + return false + } + if entry.RemoteEntry.LocalMtime == 0 && entry.RemoteEntry.RemoteSize > 0 { + return true + } + return false +} + +func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { + if entry.IsDirectory { + return false + } + if entry.RemoteEntry == nil { + return false + } + if entry.RemoteEntry.LocalMtime > 0 && len(entry.Chunks) > 0 { + return true + } + return false +} + +func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error { + + return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + if !shouldCacheToLocal(entry) { + return true // true means recursive traversal should continue + } + + println(dir, entry.Name) + + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) + + if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { + fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) + return false + } + + return true + }) +} diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go new file mode 100644 index 000000000..7a9ad1f65 --- /dev/null +++ b/weed/shell/command_remote_configure.go @@ -0,0 +1,153 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "io" + "regexp" + "strings" +) + +func init() { + Commands = append(Commands, &commandRemoteConfigure{}) +} + +type commandRemoteConfigure struct { +} + +func (c *commandRemoteConfigure) Name() string { + return "remote.configure" +} + +func (c *commandRemoteConfigure) Help() string { + return `remote storage configuration + + # see the current configurations + remote.configure + + # set or update a configuration + remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + + # delete one configuration + remote.configure -delete -name=cloud1 + +` +} + +var ( + isAlpha = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9]*$`).MatchString +) + +func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + conf := &filer_pb.RemoteConf{} + + remoteConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + isDelete := remoteConfigureCommand.Bool("delete", false, "delete one remote storage by its name") + + remoteConfigureCommand.StringVar(&conf.Name, "name", "", "a short name to identify the remote storage") + remoteConfigureCommand.StringVar(&conf.Type, "type", "s3", "storage type, currently only support s3") + + remoteConfigureCommand.StringVar(&conf.S3AccessKey, "s3.access_key", "", "s3 access key") + remoteConfigureCommand.StringVar(&conf.S3SecretKey, "s3.secret_key", "", "s3 secret key") + remoteConfigureCommand.StringVar(&conf.S3Region, "s3.region", "us-east-2", "s3 region") + remoteConfigureCommand.StringVar(&conf.S3Endpoint, "s3.endpoint", "", "endpoint for s3-compatible local object store") + + if err = remoteConfigureCommand.Parse(args); err != nil { + return nil + } + + if conf.Name == "" { + return c.listExistingRemoteStorages(commandEnv, writer) + } + + if !isAlpha(conf.Name) { + return fmt.Errorf("only letters and numbers allowed in name: %v", conf.Name) + } + + if *isDelete { + return c.deleteRemoteStorage(commandEnv, writer, conf.Name) + } + + return c.saveRemoteStorage(commandEnv, writer, conf) + +} + +func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandEnv, writer io.Writer) error { + + return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(filer.DirectoryEtcRemote), "", func(entry *filer_pb.Entry, isLast bool) error { + if len(entry.Content) == 0 { + fmt.Fprintf(writer, "skipping %s\n", entry.Name) + return nil + } + if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + return nil + } + conf := &filer_pb.RemoteConf{} + + if err := proto.Unmarshal(entry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) + } + + conf.S3SecretKey = "" + + m := jsonpb.Marshaler{ + EmitDefaults: false, + Indent: " ", + } + + err := m.Marshal(writer, conf) + fmt.Fprintln(writer) + + return err + }) + +} + +func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error { + + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.DeleteEntryRequest{ + Directory: filer.DirectoryEtcRemote, + Name: storageName + filer.REMOTE_STORAGE_CONF_SUFFIX, + IgnoreRecursiveError: false, + IsDeleteData: true, + IsRecursive: true, + IsFromOtherCluster: false, + Signatures: nil, + } + _, err := client.DeleteEntry(context.Background(), request) + + if err == nil { + fmt.Fprintf(writer, "removed: %s\n", storageName) + } + + return err + + }) + +} + +func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, writer io.Writer, conf *filer_pb.RemoteConf) error { + + data, err := proto.Marshal(conf) + if err != nil { + return err + } + + if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data) + }); err != nil && err != filer_pb.ErrNotFound { + return err + } + + return nil + +} diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go new file mode 100644 index 000000000..3b04fec63 --- /dev/null +++ b/weed/shell/command_remote_mount.go @@ -0,0 +1,249 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "io" +) + +func init() { + Commands = append(Commands, &commandRemoteMount{}) +} + +type commandRemoteMount struct { +} + +func (c *commandRemoteMount) Name() string { + return "remote.mount" +} + +func (c *commandRemoteMount) Help() string { + return `mount remote storage and pull its metadata + + # assume a remote storage is configured to name "cloud1" + remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + + # mount and pull one bucket + remote.mount -dir=xxx -remote=cloud1/bucket + # mount and pull one directory in the bucket + remote.mount -dir=xxx -remote=cloud1/bucket/dir1 + + # after mount, start a separate process to write updates to remote storage + weed filer.remote.sync -filer=<filerHost>:<filerPort> -dir=xxx + +` +} + +func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMountCommand.String("dir", "", "a directory in filer") + nonEmpty := remoteMountCommand.Bool("nonempty", false, "allows the mounting over a non-empty directory") + remote := remoteMountCommand.String("remote", "", "a directory in remote storage, ex. <storageName>/<bucket>/path/to/dir") + + if err = remoteMountCommand.Parse(args); err != nil { + return nil + } + + if *dir == "" { + _, err = listExistingRemoteStorageMounts(commandEnv, writer) + return err + } + + remoteStorageLocation := remote_storage.ParseLocation(*remote) + + // find configuration for remote storage + // remotePath is /<bucket>/path/to/dir + remoteConf, err := c.findRemoteStorageConfiguration(commandEnv, writer, remoteStorageLocation) + if err != nil { + return fmt.Errorf("find configuration for %s: %v", *remote, err) + } + + // pull metadata from remote + if err = c.pullMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remoteStorageLocation); err != nil { + return fmt.Errorf("pull metadata: %v", err) + } + + // store a mount configuration in filer + if err = c.saveMountMapping(commandEnv, writer, *dir, remoteStorageLocation); err != nil { + return fmt.Errorf("save mount mapping: %v", err) + } + + return nil +} + +func listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (mappings *filer_pb.RemoteStorageMapping, err error) { + + // read current mapping + mappings, err = filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if err != nil { + return mappings, err + } + + jsonPrintln(writer, mappings) + + return + +} + +func jsonPrintln(writer io.Writer, message proto.Message) error { + m := jsonpb.Marshaler{ + EmitDefaults: false, + Indent: " ", + } + + err := m.Marshal(writer, message) + fmt.Fprintln(writer) + return err +} + +func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { + + return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) + +} + +func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *filer_pb.RemoteConf, remote *filer_pb.RemoteStorageLocation) error { + + // find existing directory, and ensure the directory is empty + err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + parent, name := util.FullPath(dir).DirAndName() + _, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: parent, + Name: name, + }) + if lookupErr != nil { + return fmt.Errorf("lookup %s: %v", dir, lookupErr) + } + + mountToDirIsEmpty := true + listErr := filer_pb.SeaweedList(client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { + mountToDirIsEmpty = false + return nil + }, "", false, 1) + + if listErr != nil { + return fmt.Errorf("list %s: %v", dir, listErr) + } + + if !mountToDirIsEmpty { + if !nonEmpty { + return fmt.Errorf("dir %s is not empty", dir) + } + } + + return nil + }) + if err != nil { + return err + } + + // visit remote storage + remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return err + } + + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + ctx := context.Background() + err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { + localDir := dir + remoteDir + println(util.NewFullPath(localDir, name)) + + lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: localDir, + Name: name, + }) + var existingEntry *filer_pb.Entry + if lookupErr != nil { + if lookupErr != filer_pb.ErrNotFound { + return lookupErr + } + } else { + existingEntry = lookupResponse.Entry + } + + if existingEntry == nil { + _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: localDir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: isDirectory, + Attributes: &filer_pb.FuseAttributes{ + FileSize: uint64(remoteEntry.RemoteSize), + Mtime: remoteEntry.RemoteMtime, + FileMode: uint32(0644), + }, + RemoteEntry: remoteEntry, + }, + }) + return createErr + } else { + if existingEntry.RemoteEntry == nil || existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag { + return doSaveRemoteEntry(client, localDir, existingEntry, remoteEntry) + } + } + return nil + }) + return err + }) + + if err != nil { + return err + } + + return nil +} + +func (c *commandRemoteMount) saveMountMapping(commandEnv *CommandEnv, writer io.Writer, dir string, remoteStorageLocation *filer_pb.RemoteStorageLocation) (err error) { + + // read current mapping + var oldContent, newContent []byte + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE) + return err + }) + if err != nil { + if err != filer_pb.ErrNotFound { + return fmt.Errorf("read existing mapping: %v", err) + } + } + + // add new mapping + newContent, err = filer.AddRemoteStorageMapping(oldContent, dir, remoteStorageLocation) + if err != nil { + return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err) + } + + // save back + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent) + }) + if err != nil { + return fmt.Errorf("save mapping: %v", err) + } + + return nil +} + +func doSaveRemoteEntry(client filer_pb.SeaweedFilerClient, localDir string, existingEntry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + existingEntry.RemoteEntry = remoteEntry + existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize) + existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime + _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: localDir, + Entry: existingEntry, + }) + if updateErr != nil { + return updateErr + } + return nil +} diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go new file mode 100644 index 000000000..64cc1472c --- /dev/null +++ b/weed/shell/command_remote_uncache.go @@ -0,0 +1,99 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "strings" +) + +func init() { + Commands = append(Commands, &commandRemoteUncache{}) +} + +type commandRemoteUncache struct { +} + +func (c *commandRemoteUncache) Name() string { + return "remote.uncache" +} + +func (c *commandRemoteUncache) Help() string { + return `keep the metadata but remote cache the file content for mounted directories or files + + remote.uncache -dir=xxx + remote.uncache -dir=xxx/some/sub/dir + +` +} + +func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMountCommand.String("dir", "", "a directory in filer") + + if err = remoteMountCommand.Parse(args); err != nil { + return nil + } + + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return listErr + } + if *dir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintln(writer, "need to specify '-dir' option") + return nil + } + + var localMountedDir string + for k := range mappings.Mappings { + if strings.HasPrefix(*dir, k) { + localMountedDir = k + } + } + if localMountedDir == "" { + jsonPrintln(writer, mappings) + fmt.Fprintf(writer, "%s is not mounted\n", *dir) + return nil + } + + // pull content from remote + if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir)); err != nil { + return fmt.Errorf("cache content data: %v", err) + } + + return nil +} + +func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer io.Writer, dirToCache util.FullPath) error { + + return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { + if !mayHaveCachedToLocal(entry) { + return true // true means recursive traversal should continue + } + entry.RemoteEntry.LocalMtime = 0 + entry.Chunks = nil + + println(dir, entry.Name) + + err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: string(dir), + Entry: entry, + }) + return updateErr + }) + if err != nil { + fmt.Fprintf(writer, "uncache %+v: %v\n", dir.Child(entry.Name), err) + return false + } + + return true + }) +} diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go new file mode 100644 index 000000000..b16da44f1 --- /dev/null +++ b/weed/shell/command_remote_unmount.go @@ -0,0 +1,146 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" +) + +func init() { + Commands = append(Commands, &commandRemoteUnmount{}) +} + +type commandRemoteUnmount struct { +} + +func (c *commandRemoteUnmount) Name() string { + return "remote.unmount" +} + +func (c *commandRemoteUnmount) Help() string { + return `unmount remote storage + + # assume a remote storage is configured to name "s3_1" + remote.configure -name=s3_1 -type=s3 -access_key=xxx -secret_key=yyy + # mount and pull one bucket + remote.mount -dir=xxx -remote=s3_1/bucket + + # unmount the mounted directory and remove its cache + remote.unmount -dir=xxx + +` +} + +func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMountCommand.String("dir", "", "a directory in filer") + + if err = remoteMountCommand.Parse(args); err != nil { + return nil + } + + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return listErr + } + if *dir == "" { + return jsonPrintln(writer, mappings) + } + + _, found := mappings.Mappings[*dir] + if !found { + return fmt.Errorf("directory %s is not mounted", *dir) + } + + // purge mounted data + if err = c.purgeMountedData(commandEnv, *dir); err != nil { + return fmt.Errorf("purge mounted data: %v", err) + } + + // store a mount configuration in filer + if err = c.deleteMountMapping(commandEnv, *dir); err != nil { + return fmt.Errorf("delete mount mapping: %v", err) + } + + return nil +} + +func (c *commandRemoteUnmount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { + + return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) + +} + +func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error { + + // find existing directory, and ensure the directory is empty + err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + parent, name := util.FullPath(dir).DirAndName() + lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: parent, + Name: name, + }) + if lookupErr != nil { + return fmt.Errorf("lookup %s: %v", dir, lookupErr) + } + + oldEntry := lookupResp.Entry + + deleteError := filer_pb.DoRemove(client, parent, name, true, true, true, false, nil) + if deleteError != nil { + return fmt.Errorf("delete %s: %v", dir, deleteError) + } + + mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) { + entry.Attributes = oldEntry.Attributes + entry.Extended = oldEntry.Extended + }) + if mkdirErr != nil { + return fmt.Errorf("mkdir %s: %v", dir, mkdirErr) + } + + return nil + }) + if err != nil { + return err + } + + return nil +} + +func (c *commandRemoteUnmount) deleteMountMapping(commandEnv *CommandEnv, dir string) (err error) { + + // read current mapping + var oldContent, newContent []byte + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE) + return err + }) + if err != nil { + if err != filer_pb.ErrNotFound { + return fmt.Errorf("read existing mapping: %v", err) + } + } + + // add new mapping + newContent, err = filer.RemoveRemoteStorageMapping(oldContent, dir) + if err != nil { + return fmt.Errorf("delete mount %s: %v", dir, err) + } + + // save back + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent) + }) + if err != nil { + return fmt.Errorf("save mapping: %v", err) + } + + return nil +} diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index a8d8c5c29..26953c249 100644 --- a/weed/shell/command_s3_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -1,8 +1,10 @@ package shell import ( + "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "io" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -49,6 +51,17 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("read buckets: %v", err) } + // delete the collection directly first + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ + Name: *bucketName, + }) + return err + }) + if err != nil { + return + } + return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil) } diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index ca51ef72f..5eab2ebd0 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -164,7 +164,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io } buf.Reset() - filer.S3ConfigurationToText(&buf, s3cfg) + filer.ProtoToText(&buf, s3cfg) fmt.Fprintf(writer, string(buf.Bytes())) fmt.Fprintln(writer) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index ad7da0e44..162b66556 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -120,7 +120,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, vo func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { - // balance writable volumes + // balance read only volumes for _, n := range nodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { if collection != "ALL_COLLECTIONS" { @@ -128,14 +128,14 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT return false } } - return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit) + return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil { return err } - // balance readable volumes + // balance writable volumes for _, n := range nodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { if collection != "ALL_COLLECTIONS" { @@ -143,10 +143,10 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT return false } } - return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit) + return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil { return err } @@ -340,7 +340,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f } fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyChange { - return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType, false) } return nil } diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index b77811f51..9732e9bb7 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -1,6 +1,8 @@ package shell import ( + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/stretchr/testify/assert" "testing" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -181,3 +183,14 @@ func TestBalance(t *testing.T) { } } + +func TestVolumeSelection(t *testing.T) { + topologyInfo := parseOutput(topoData) + + vids, err := collectVolumeIdsForTierChange(nil, topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0) + if err != nil { + t.Errorf("collectVolumeIdsForTierChange: %v", err) + } + assert.Equal(t, 378, len(vids)) + +} diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 0f156ac2f..7e060f3d3 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -89,25 +89,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write continue } - // reset index db - aDB.Close() - bDB.Close() - aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb() - - // read index db - if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil { - return err - } - if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil { - return err - } - - // find and make up the differnces - if err := c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { - return err - } - if err := c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { - return err + if err := c.syncTwoReplicas(aDB, bDB, a, verbose, writer, b, err, applyChanges, nonRepairThreshold); err != nil { + fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } replicas = replicas[1:] } @@ -116,7 +99,34 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write return nil } -func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, nonRepairThreshold float64) error { +func (c *commandVolumeCheckDisk) syncTwoReplicas(aDB *needle_map.MemDb, bDB *needle_map.MemDb, a *VolumeReplica, verbose *bool, writer io.Writer, b *VolumeReplica, err error, applyChanges *bool, nonRepairThreshold *float64) error { + aHasChanges, bHasChanges := true, true + for aHasChanges || bHasChanges { + // reset index db + aDB.Close() + bDB.Close() + aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb() + + // read index db + if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil { + return err + } + if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil { + return err + } + + // find and make up the differences + if aHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { + return err + } + if bHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { + return err + } + } + return nil +} + +func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, nonRepairThreshold float64) (hasChanges bool, err error) { // find missing keys // hash join, can be more efficient @@ -133,12 +143,12 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles)) if counter == 0 || len(missingNeedles) == 0 { - return nil + return false, nil } missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter) if missingNeedlesFraction > nonRepairThreshold { - return fmt.Errorf( + return false, fmt.Errorf( "failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f", source.info.Id, missingNeedlesFraction, nonRepairThreshold) } @@ -147,7 +157,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m needleBlob, err := c.readSourceNeedleBlob(source.location.dataNode.Id, source.info.Id, needleValue) if err != nil { - return err + return hasChanges, err } if !applyChanges { @@ -158,13 +168,15 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m fmt.Fprintf(writer, "read %d,%x %s => %s \n", source.info.Id, needleValue.Key, source.location.dataNode.Id, target.location.dataNode.Id) } - if err := c.writeNeedleBlobToTarget(target.location.dataNode.Id, source.info.Id, needleValue, needleBlob); err != nil { - return err + hasChanges = true + + if err = c.writeNeedleBlobToTarget(target.location.dataNode.Id, source.info.Id, needleValue, needleBlob); err != nil { + return hasChanges, err } } - return nil + return } func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go new file mode 100644 index 000000000..079915f66 --- /dev/null +++ b/weed/shell/command_volume_delete_empty.go @@ -0,0 +1,74 @@ +package shell + +import ( + "flag" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "io" + "log" + "time" +) + +func init() { + Commands = append(Commands, &commandVolumeDeleteEmpty{}) +} + +type commandVolumeDeleteEmpty struct { +} + +func (c *commandVolumeDeleteEmpty) Name() string { + return "volume.deleteEmpty" +} + +func (c *commandVolumeDeleteEmpty) Help() string { + return `delete empty volumes from all volume servers + + volume.deleteEmpty -quietFor=24h -force + + This command deletes all empty volumes from one volume server. + +` +} + +func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + quietPeriod := volDeleteCommand.Duration("quietFor", 24*time.Hour, "select empty volumes with no recent writes, avoid newly created ones") + applyBalancing := volDeleteCommand.Bool("force", false, "apply to delete empty volumes") + if err = volDeleteCommand.Parse(args); err != nil { + return nil + } + + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } + + quietSeconds := int64(*quietPeriod / time.Second) + nowUnixSeconds := time.Now().Unix() + + eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Size <= 8 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { + if *applyBalancing { + log.Printf("deleting empty volume %d from %s", v.Id, dn.Id) + if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), dn.Id); deleteErr != nil { + err = deleteErr + } + continue + } else { + log.Printf("empty volume %d from %s", v.Id, dn.Id) + } + } + } + } + }) + + return +} diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 538351fd0..326cb2a40 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -58,6 +58,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") + retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") if err = volFixReplicationCommand.Parse(args); err != nil { return nil } @@ -100,7 +101,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } // find the most under populated data nodes - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) + return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount) } @@ -154,64 +155,74 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { for _, vid := range underReplicatedVolumeIds { - replicas := volumeReplicas[vid] - replica := pickOneReplicaToCopyFrom(replicas) - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - foundNewLocation := false - hasSkippedCollection := false - keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) - for _, dst := range allLocations { - // check whether data nodes satisfy the constraints - if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { - // check collection name pattern - if *c.collectionPattern != "" { - matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) - if err != nil { - return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) - } - if !matched { - hasSkippedCollection = true - break - } - } - - // ask the volume server to replicate the volume - foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) + for i := 0; i < retryCount+1; i++ { + if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + break + } + } + } + return +} - if !takeAction { +func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { + replicas := volumeReplicas[vid] + replica := pickOneReplicaToCopyFrom(replicas) + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + foundNewLocation := false + hasSkippedCollection := false + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + for _, dst := range allLocations { + // check whether data nodes satisfy the constraints + if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + hasSkippedCollection = true break } + } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: replica.info.Id, - SourceDataNode: replica.location.dataNode.Id, - }) - if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) - } - return nil - }) - - if err != nil { - return err - } + // ask the volume server to replicate the volume + foundNewLocation = true + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) + if !takeAction { // adjust free volume count dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- break } - } - if !foundNewLocation && !hasSkippedCollection { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: replica.info.Id, + SourceDataNode: replica.location.dataNode.Id, + }) + if replicateErr != nil { + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) + } + return nil + }) + + if err != nil { + return err + } + + // adjust free volume count + dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- + break } + } + if !foundNewLocation && !hasSkippedCollection { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } return nil } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 1fbc9ad35..27c253209 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -1,9 +1,11 @@ package shell import ( + "bufio" "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" "io/ioutil" "math" @@ -44,6 +46,12 @@ func (c *commandVolumeFsck) Help() string { 2. collect all file ids from the filer, as set B 3. find out the set A subtract B + If -findMissingChunksInFiler is enabled, this works + in a reverse way: + 1. collect all file ids from all volumes, as set A + 2. collect all file ids from the filer, as set B + 3. find out the set B subtract A + ` } @@ -55,6 +63,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) verbose := fsckCommand.Bool("v", false, "verbose mode") + findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"") + findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer") if err = fsckCommand.Parse(args); err != nil { return nil @@ -86,15 +96,108 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. } } - // collect all filer file ids - if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil { - return fmt.Errorf("failed to collect file ids from filer: %v", err) + if *findMissingChunksInFiler { + // collect all filer file ids and paths + if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, applyPurging); err != nil { + return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) + } + // for each volume, check filer file ids + if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err) + } + } else { + // collect all filer file ids + if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil { + return fmt.Errorf("failed to collect file ids from filer: %v", err) + } + // volume file ids substract filer file ids + if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + return fmt.Errorf("findExtraChunksInVolumeServers: %v", err) + } + } + + return nil +} + +func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, applyPurging *bool) error { + + if verbose { + fmt.Fprintf(writer, "checking each file from filer ...\n") + } + + files := make(map[uint32]*os.File) + for vid := range volumeIdToServer { + dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + } + files[vid] = dst } + defer func() { + for _, f := range files { + f.Close() + } + }() + + type Item struct { + vid uint32 + fileKey uint64 + cookie uint32 + path util.FullPath + } + return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(outputChan chan interface{}) { + buffer := make([]byte, 16) + for item := range outputChan { + i := item.(*Item) + if f, ok := files[i.vid]; ok { + util.Uint64toBytes(buffer, i.fileKey) + util.Uint32toBytes(buffer[8:], i.cookie) + util.Uint32toBytes(buffer[12:], uint32(len(i.path))) + f.Write(buffer) + f.Write([]byte(i.path)) + // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path) + } else { + fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) + } + } + }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + if verbose && entry.Entry.IsDirectory { + fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) + } + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) + if resolveErr != nil { + return nil + } + dChunks = append(dChunks, mChunks...) + for _, chunk := range dChunks { + outputChan <- &Item{ + vid: chunk.Fid.VolumeId, + fileKey: chunk.Fid.FileKey, + cookie: chunk.Fid.Cookie, + path: util.NewFullPath(entry.Dir, entry.Entry.Name), + } + } + return nil + }) - // volume file ids substract filer file ids + return nil +} + +func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { + + for volumeId, vinfo := range volumeIdToVInfo { + checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose) + if checkErr != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) + } + } + return nil +} + +func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 for volumeId, vinfo := range volumeIdToVInfo { - inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose) + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } @@ -102,9 +205,9 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. totalOrphanChunkCount += uint64(len(orphanFileIds)) totalOrphanDataSize += orphanDataSize - if *verbose { + if verbose { for _, fid := range orphanFileIds { - fmt.Fprintf(writer, "%sxxxxxxxx\n", fid) + fmt.Fprintf(writer, "%s\n", fid) } } @@ -112,8 +215,14 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. if vinfo.isEcVolume { fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n") } - if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { - return fmt.Errorf("purge for volume %d: %v\n", volumeId, err) + if inUseCount == 0 { + if err := deleteVolume(c.env.option.GrpcDialOption, needle.VolumeId(volumeId), vinfo.server); err != nil { + return fmt.Errorf("delete volume %d: %v\n", volumeId, err) + } + } else { + if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { + return fmt.Errorf("purge for volume %d: %v\n", volumeId, err) + } } } } @@ -130,7 +239,6 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n") } - return nil } @@ -203,7 +311,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer files[i.vid].Write(buffer) } }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { - dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { return nil } @@ -218,6 +326,69 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer }) } +func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) { + + if verbose { + fmt.Fprintf(writer, "find missing file chuns in volume %d ...\n", volumeId) + } + + db := needle_map.NewMemDb() + defer db.Close() + + if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil { + return + } + + file := getFilerFileIdFile(tempFolder, volumeId) + fp, err := os.Open(file) + if err != nil { + return + } + defer fp.Close() + + type Item struct { + fileKey uint64 + cookie uint32 + path util.FullPath + } + + br := bufio.NewReader(fp) + buffer := make([]byte, 16) + item := &Item{} + var readSize int + for { + readSize, err = io.ReadFull(br, buffer) + if err != nil || readSize != 16 { + if err == io.EOF { + return nil + } else { + break + } + } + + item.fileKey = util.BytesToUint64(buffer[:8]) + item.cookie = util.BytesToUint32(buffer[8:12]) + pathSize := util.BytesToUint32(buffer[12:16]) + pathBytes := make([]byte, int(pathSize)) + n, err := io.ReadFull(br, pathBytes) + if err != nil { + fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err) + } + if n != int(pathSize) { + fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n) + } + item.path = util.FullPath(string(pathBytes)) + + if _, found := db.Get(types.NeedleId(item.fileKey)); !found { + fmt.Fprintf(writer, "%d,%x%08x in %s %d not found\n", volumeId, item.fileKey, item.cookie, item.path, pathSize) + } + + } + + return + +} + func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { db := needle_map.NewMemDb() @@ -246,7 +417,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri var orphanFileCount uint64 db.AscendingVisit(func(n needle_map.NeedleValue) error { // fmt.Printf("%d,%x\n", volumeId, n.Key) - orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String())) + orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String())) orphanFileCount++ orphanDataSize += uint64(n.Size) return nil diff --git a/weed/shell/command_volume_list_test.go b/weed/shell/command_volume_list_test.go index 72c76f242..379cf4943 100644 --- a/weed/shell/command_volume_list_test.go +++ b/weed/shell/command_volume_list_test.go @@ -68,7 +68,7 @@ func parseOutput(output string) *master_pb.TopologyInfo { maxVolumeCount, _ := strconv.Atoi(maxVolumeCountStr) disk = &master_pb.DiskInfo{ Type: diskType, - MaxVolumeCount: uint64(maxVolumeCount), + MaxVolumeCount: int64(maxVolumeCount), } dn.DiskInfos[types.ToDiskType(diskType).String()] = disk } else { diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 84f33db34..666e3e867 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -69,11 +69,11 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("source and target volume servers are the same!") } - return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, false) } // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests. -func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string) (err error) { +func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) { log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType) @@ -83,7 +83,11 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil { - return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) + if skipTailError { + fmt.Fprintf(writer, "tail volume %d from %s to %s: %v\n", volumeId, sourceVolumeServer, targetVolumeServer, err) + } else { + return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) + } } log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index f21d0334c..6e0c19ae1 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -52,6 +52,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") + retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry") if err = vsEvacuateCommand.Parse(args); err != nil { return nil } @@ -60,7 +61,13 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, return fmt.Errorf("need to specify volume server by -node=<host>:<port>") } - return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer) + for i := 0; i < *retryCount+1; i++ { + if err = volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil { + return nil + } + } + + return } diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index d6a49d6e1..d370d93e4 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -7,6 +7,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/wdclient" "io" + "path/filepath" + "sync" "time" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -17,6 +19,9 @@ func init() { } type commandVolumeTierMove struct { + activeServers map[string]struct{} + activeServersLock sync.Mutex + activeServersCond *sync.Cond } func (c *commandVolumeTierMove) Name() string { @@ -26,7 +31,7 @@ func (c *commandVolumeTierMove) Name() string { func (c *commandVolumeTierMove) Help() string { return `change a volume from one disk type to another - volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h] + volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped. So "volume.fix.replication" and "volume.balance" should be followed. @@ -36,12 +41,15 @@ func (c *commandVolumeTierMove) Help() string { func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + c.activeServers = make(map[string]struct{}) + c.activeServersCond = sync.NewCond(new(sync.Mutex)) + if err = commandEnv.confirmIsLocked(); err != nil { return } tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - collection := tierCommand.String("collection", "", "the collection name") + collectionPattern := tierCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") source := tierCommand.String("fromDiskType", "", "the source disk type") @@ -65,7 +73,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } // collect all volumes that should change - volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod) if err != nil { return err } @@ -73,7 +81,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer _, allLocations := collectVolumeReplicaLocations(topologyInfo) for _, vid := range volumeIds { - if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil { + if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations, *applyChange); err != nil { fmt.Printf("tier move volume %d: %v\n", vid, err) } } @@ -90,7 +98,7 @@ func isOneOf(server string, locations []wdclient.Location) bool { return false } -func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) { +func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -120,25 +128,32 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin hasFoundTarget = true if !applyChanges { + // adjust volume count + dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++ break } - // mark all replicas as read only - if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil { - return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) - } - if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString()); err != nil { - return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) + c.activeServersCond.L.Lock() + _, isSourceActive := c.activeServers[sourceVolumeServer] + _, isDestActive := c.activeServers[dst.dataNode.Id] + for isSourceActive || isDestActive { + c.activeServersCond.Wait() + _, isSourceActive = c.activeServers[sourceVolumeServer] + _, isDestActive = c.activeServers[dst.dataNode.Id] } + c.activeServers[sourceVolumeServer] = struct{}{} + c.activeServers[dst.dataNode.Id] = struct{}{} + c.activeServersCond.L.Unlock() - // remove the remaining replicas - for _, loc := range locations { - if loc.Url != dst.dataNode.Id { - if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil { - fmt.Fprintf(writer, "failed to delete volume %d on %s\n", vid, loc.Url) - } + go func(dst location) { + if err := c.doMoveOneVolume(commandEnv, writer, vid, toDiskType, locations, sourceVolumeServer, dst); err != nil { + fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", vid, sourceVolumeServer, dst.dataNode.Id, err) } - } + delete(c.activeServers, sourceVolumeServer) + delete(c.activeServers, dst.dataNode.Id) + c.activeServersCond.Signal() + }(dst) + } } @@ -149,7 +164,31 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin return nil } -func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) { + + // mark all replicas as read only + if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil { + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) + } + if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil { + return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) + } + + // adjust volume count + dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++ + + // remove the remaining replicas + for _, loc := range locations { + if loc.Url != dst.dataNode.Id && loc.Url != sourceVolumeServer { + if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil { + fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err) + } + } + } + return nil +} + +func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() @@ -160,7 +199,18 @@ func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { - if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier { + // check collection name pattern + if collectionPattern != "" { + matched, err := filepath.Match(collectionPattern, v.Collection) + if err != nil { + return + } + if !matched { + continue + } + } + + if v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier { if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 { vidMap[v.Id] = true } diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 0e285214b..5497e89cc 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -20,9 +20,10 @@ type ShellOptions struct { Masters *string GrpcDialOption grpc.DialOption // shell transient context - FilerHost string - FilerPort int64 - Directory string + FilerHost string + FilerPort int64 + FilerAddress string + Directory string } type CommandEnv struct { @@ -75,7 +76,7 @@ func (ce *CommandEnv) confirmIsLocked() error { return nil } - return fmt.Errorf("need to lock to continue") + return fmt.Errorf("need to run \"lock\" first to continue") } diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 1dd611ca5..765b0efda 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -31,6 +31,7 @@ func RunShell(options ShellOptions) { }) line.SetCtrlCAborts(true) + line.SetTabCompletionStyle(liner.TabPrints) setCompletionHandler() loadHistory() @@ -147,9 +148,11 @@ func loadHistory() { func saveHistory() { if f, err := os.Create(historyPath); err != nil { - fmt.Printf("Error writing history file: %v\n", err) + fmt.Printf("Error creating history file: %v\n", err) } else { - line.WriteHistory(f) + if _, err = line.WriteHistory(f); err != nil { + fmt.Printf("Error writing history file: %v\n", err) + } f.Close() } } |
