diff options
Diffstat (limited to 'weed/shell')
75 files changed, 481 insertions, 51 deletions
diff --git a/weed/shell/command.go b/weed/shell/command.go new file mode 100644 index 000000000..cfd994f3f --- /dev/null +++ b/weed/shell/command.go @@ -0,0 +1,20 @@ +package shell + +import "io" + +type command interface { + Name() string + Help() string + Do([]string, *CommandEnv, io.Writer) error + HasTag(tag CommandTag) bool +} + +var ( + Commands = []command{} +) + +type CommandTag string + +const ( + ResourceHeavy CommandTag = "resourceHeavy" +) diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index 5c9866d29..27a4f2bb3 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -32,6 +32,10 @@ func (c *commandClusterCheck) Help() string { ` } +func (c *commandClusterCheck) HasTag(CommandTag) bool { + return false +} + func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index 22925da5b..5a1503612 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -35,6 +35,10 @@ func (c *commandClusterPs) Help() string { ` } +func (c *commandClusterPs) HasTag(CommandTag) bool { + return false +} + func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_cluster_raft_add.go b/weed/shell/command_cluster_raft_add.go index 6dce8d147..6089631b1 100644 --- a/weed/shell/command_cluster_raft_add.go +++ b/weed/shell/command_cluster_raft_add.go @@ -27,6 +27,10 @@ func (c *commandRaftServerAdd) Help() string { ` } +func (c *commandRaftServerAdd) HasTag(CommandTag) bool { + return false +} + func (c *commandRaftServerAdd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_cluster_raft_ps.go b/weed/shell/command_cluster_raft_ps.go index 58e7d7585..c8324f635 100644 --- a/weed/shell/command_cluster_raft_ps.go +++ b/weed/shell/command_cluster_raft_ps.go @@ -26,6 +26,10 @@ func (c *commandRaftClusterPs) Help() string { ` } +func (c *commandRaftClusterPs) HasTag(CommandTag) bool { + return false +} + func (c *commandRaftClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { raftClusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_cluster_raft_remove.go b/weed/shell/command_cluster_raft_remove.go index c885d145b..109125890 100644 --- a/weed/shell/command_cluster_raft_remove.go +++ b/weed/shell/command_cluster_raft_remove.go @@ -27,6 +27,10 @@ func (c *commandRaftServerRemove) Help() string { ` } +func (c *commandRaftServerRemove) HasTag(CommandTag) bool { + return false +} + func (c *commandRaftServerRemove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 936f35b46..0239d4c55 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -28,6 +28,10 @@ func (c *commandCollectionDelete) Help() string { ` } +func (c *commandCollectionDelete) HasTag(CommandTag) bool { + return false +} + func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { colDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index c277d4028..32085f565 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -23,6 +23,10 @@ func (c *commandCollectionList) Help() string { return `list all collections` } +func (c *commandCollectionList) HasTag(CommandTag) bool { + return false +} + type CollectionInfo struct { FileCount float64 DeleteCount float64 diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 217e5750e..704027580 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -98,6 +98,10 @@ func (c *commandEcBalance) Help() string { ` } +func (c *commandEcBalance) HasTag(CommandTag) bool { + return false +} + func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index aa0ca5045..02d0f316d 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -37,6 +37,10 @@ func (c *commandEcDecode) Help() string { ` } +func (c *commandEcDecode) HasTag(CommandTag) bool { + return false +} + func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := decodeCommand.Int("volumeId", 0, "the volume id") diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 16de2ce73..e41529174 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -4,13 +4,14 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb" "io" "math/rand" "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -55,6 +56,10 @@ func (c *commandEcEncode) Help() string { ` } +func (c *commandEcEncode) HasTag(CommandTag) bool { + return false +} + func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -125,7 +130,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, // fmt.Printf("found ec %d shards on %v\n", vid, locations) // mark the volume as readonly - err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false) + err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false) if err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index a4dfac67c..b761ea676 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -55,6 +55,10 @@ func (c *commandEcRebuild) Help() string { ` } +func (c *commandEcRebuild) HasTag(CommandTag) bool { + return false +} + func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index bdaa757f5..47a7f3be8 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -26,6 +26,10 @@ func (c *commandFsCat) Help() string { ` } +func (c *commandFsCat) HasTag(CommandTag) bool { + return false +} + func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { path, err := commandEnv.parseUrl(findInputDirectory(args)) diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go index 2cc28f7a2..698865142 100644 --- a/weed/shell/command_fs_cd.go +++ b/weed/shell/command_fs_cd.go @@ -28,6 +28,10 @@ func (c *commandFsCd) Help() string { ` } +func (c *commandFsCd) HasTag(CommandTag) bool { + return false +} + func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { path, err := commandEnv.parseUrl(findInputDirectory(args)) diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 99ef4a59f..b70eb88fc 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -46,6 +46,10 @@ func (c *commandFsConfigure) Help() string { ` } +func (c *commandFsConfigure) HasTag(CommandTag) bool { + return false +} + func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index e27ff6f6c..c35179f88 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -29,6 +29,10 @@ func (c *commandFsDu) Help() string { ` } +func (c *commandFsDu) HasTag(CommandTag) bool { + return false +} + func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { path, err := commandEnv.parseUrl(findInputDirectory(args)) diff --git a/weed/shell/command_fs_log.go b/weed/shell/command_fs_log.go index 5567f76e6..137a218f0 100644 --- a/weed/shell/command_fs_log.go +++ b/weed/shell/command_fs_log.go @@ -27,6 +27,10 @@ func (c *commandFsLogPurge) Help() string { ` } +func (c *commandFsLogPurge) HasTag(CommandTag) bool { + return false +} + func (c *commandFsLogPurge) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsLogPurgeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) daysAgo := fsLogPurgeCommand.Uint("daysAgo", 365, "purge logs older than N days") diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 10764175b..e007b4c9e 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -33,6 +33,10 @@ func (c *commandFsLs) Help() string { ` } +func (c *commandFsLs) HasTag(CommandTag) bool { + return false +} + func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { var isLongFormat, showHidden bool diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index b77feb8e3..eb401aab1 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -44,6 +44,10 @@ func (c *commandFsMergeVolumes) Help() string { ` } +func (c *commandFsMergeVolumes) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -322,7 +326,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie if err != nil { return err } - + _, err, _ = uploader.Upload(reader, &operation.UploadOption{ UploadUrl: uploadURL, Filename: filename, diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 69eee6e01..e47d8faa6 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -30,6 +30,10 @@ func (c *commandFsMetaCat) Help() string { ` } +func (c *commandFsMetaCat) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { path, err := commandEnv.parseUrl(findInputDirectory(args)) diff --git a/weed/shell/command_fs_meta_change_volume_id.go b/weed/shell/command_fs_meta_change_volume_id.go index 0d350644d..f1c148f5b 100644 --- a/weed/shell/command_fs_meta_change_volume_id.go +++ b/weed/shell/command_fs_meta_change_volume_id.go @@ -37,6 +37,10 @@ func (c *commandFsMetaChangeVolumeId) Help() string { ` } +func (c *commandFsMetaChangeVolumeId) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsMetaChangeVolumeIdCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 5f6559867..d56274362 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -38,6 +38,10 @@ func (c *commandFsMetaLoad) Help() string { ` } +func (c *commandFsMetaLoad) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) == 0 { diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index 5445810ed..d7aca21d3 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -30,6 +30,10 @@ func (c *commandFsMetaNotify) Help() string { ` } +func (c *commandFsMetaNotify) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { path, err := commandEnv.parseUrl(findInputDirectory(args)) diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 6a2ea6e75..a8be9fe2c 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -44,6 +44,10 @@ func (c *commandFsMetaSave) Help() string { ` } +func (c *commandFsMetaSave) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go index 30cd26a20..9c33aa81c 100644 --- a/weed/shell/command_fs_mkdir.go +++ b/weed/shell/command_fs_mkdir.go @@ -27,6 +27,10 @@ func (c *commandFsMkdir) Help() string { ` } +func (c *commandFsMkdir) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { path, err := commandEnv.parseUrl(findInputDirectory(args)) diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 8e609edc9..cb966571c 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -34,6 +34,10 @@ func (c *commandFsMv) Help() string { ` } +func (c *commandFsMv) HasTag(CommandTag) bool { + return false +} + func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if len(args) != 2 { diff --git a/weed/shell/command_fs_pwd.go b/weed/shell/command_fs_pwd.go index d7d9819c8..e74fb6c3d 100644 --- a/weed/shell/command_fs_pwd.go +++ b/weed/shell/command_fs_pwd.go @@ -20,6 +20,10 @@ func (c *commandFsPwd) Help() string { return `print out current directory` } +func (c *commandFsPwd) HasTag(CommandTag) bool { + return false +} + func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory) diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go index b8445b7e9..0af75f048 100644 --- a/weed/shell/command_fs_rm.go +++ b/weed/shell/command_fs_rm.go @@ -34,6 +34,10 @@ func (c *commandFsRm) Help() string { ` } +func (c *commandFsRm) HasTag(CommandTag) bool { + return false +} + func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { isRecursive := false ignoreRecursiveError := false diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index 2b2ad2cb7..21e352af2 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -28,6 +28,10 @@ func (c *commandFsTree) Help() string { ` } +func (c *commandFsTree) HasTag(CommandTag) bool { + return false +} + func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { path, err := commandEnv.parseUrl(findInputDirectory(args)) diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index 9b0e18f94..ea9f86c3c 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -51,6 +51,10 @@ func (c *commandFsVerify) Help() string { ` } +func (c *commandFsVerify) HasTag(CommandTag) bool { + return false +} + func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { c.env = commandEnv c.writer = writer diff --git a/weed/shell/command_lock_unlock.go b/weed/shell/command_lock_unlock.go index a214ea91c..79f3c95b1 100644 --- a/weed/shell/command_lock_unlock.go +++ b/weed/shell/command_lock_unlock.go @@ -25,6 +25,10 @@ func (c *commandLock) Help() string { ` } +func (c *commandLock) HasTag(CommandTag) bool { + return false +} + func (c *commandLock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { commandEnv.locker.RequestLock(util.DetectedHostAddress()) @@ -47,6 +51,10 @@ func (c *commandUnlock) Help() string { ` } +func (c *commandUnlock) HasTag(CommandTag) bool { + return false +} + func (c *commandUnlock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { commandEnv.locker.ReleaseLock() diff --git a/weed/shell/command_mount_configure.go b/weed/shell/command_mount_configure.go index 941b7c797..5b224c39e 100644 --- a/weed/shell/command_mount_configure.go +++ b/weed/shell/command_mount_configure.go @@ -34,6 +34,10 @@ func (c *commandMountConfigure) Help() string { ` } +func (c *commandMountConfigure) HasTag(CommandTag) bool { + return false +} + func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { mountConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_mq_balance.go b/weed/shell/command_mq_balance.go index dbe218b54..2e61e640c 100644 --- a/weed/shell/command_mq_balance.go +++ b/weed/shell/command_mq_balance.go @@ -25,6 +25,10 @@ func (c *commandMqBalanceTopics) Help() string { ` } +func (c *commandMqBalanceTopics) HasTag(CommandTag) bool { + return false +} + func (c *commandMqBalanceTopics) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { // find the broker balancer diff --git a/weed/shell/command_mq_topic_configure.go b/weed/shell/command_mq_topic_configure.go index c5721d9d9..9342c8604 100644 --- a/weed/shell/command_mq_topic_configure.go +++ b/weed/shell/command_mq_topic_configure.go @@ -29,6 +29,10 @@ func (c *commandMqTopicConfigure) Help() string { ` } +func (c *commandMqTopicConfigure) HasTag(CommandTag) bool { + return false +} + func (c *commandMqTopicConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { // parse parameters diff --git a/weed/shell/command_mq_topic_desc.go b/weed/shell/command_mq_topic_desc.go index cedad6ed4..8c944271c 100644 --- a/weed/shell/command_mq_topic_desc.go +++ b/weed/shell/command_mq_topic_desc.go @@ -24,6 +24,10 @@ func (c *commandMqTopicDescribe) Help() string { return `describe a topic` } +func (c *commandMqTopicDescribe) HasTag(CommandTag) bool { + return false +} + func (c *commandMqTopicDescribe) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { // parse parameters mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_mq_topic_list.go b/weed/shell/command_mq_topic_list.go index 0a193cb4e..8da86f4a6 100644 --- a/weed/shell/command_mq_topic_list.go +++ b/weed/shell/command_mq_topic_list.go @@ -25,6 +25,10 @@ func (c *commandMqTopicList) Help() string { return `print out all topics` } +func (c *commandMqTopicList) HasTag(CommandTag) bool { + return false +} + func (c *commandMqTopicList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { brokerBalancer, err := findBrokerBalancer(commandEnv) diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 3a44d26e4..da19af0cf 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -46,6 +46,10 @@ func (c *commandRemoteCache) Help() string { ` } +func (c *commandRemoteCache) HasTag(CommandTag) bool { + return false +} + func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go index 11f7696b2..dbc44c8bf 100644 --- a/weed/shell/command_remote_configure.go +++ b/weed/shell/command_remote_configure.go @@ -48,6 +48,10 @@ func (c *commandRemoteConfigure) Help() string { ` } +func (c *commandRemoteConfigure) HasTag(CommandTag) bool { + return false +} + var ( isAlpha = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9]*$`).MatchString ) diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go index 3b6d92870..1b2e33c14 100644 --- a/weed/shell/command_remote_meta_sync.go +++ b/weed/shell/command_remote_meta_sync.go @@ -44,6 +44,10 @@ func (c *commandRemoteMetaSync) Help() string { ` } +func (c *commandRemoteMetaSync) HasTag(CommandTag) bool { + return false +} + func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 9dffd10eb..cfe89ddce 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -44,6 +44,10 @@ func (c *commandRemoteMount) Help() string { ` } +func (c *commandRemoteMount) HasTag(CommandTag) bool { + return false +} + func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_remote_mount_buckets.go b/weed/shell/command_remote_mount_buckets.go index 91375d2d2..d8df09e60 100644 --- a/weed/shell/command_remote_mount_buckets.go +++ b/weed/shell/command_remote_mount_buckets.go @@ -38,6 +38,10 @@ func (c *commandRemoteMountBuckets) Help() string { ` } +func (c *commandRemoteMountBuckets) HasTag(CommandTag) bool { + return false +} + func (c *commandRemoteMountBuckets) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { remoteMountBucketsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go index 25e51ff74..86d992ef1 100644 --- a/weed/shell/command_remote_uncache.go +++ b/weed/shell/command_remote_uncache.go @@ -41,6 +41,10 @@ func (c *commandRemoteUncache) Help() string { ` } +func (c *commandRemoteUncache) HasTag(CommandTag) bool { + return false +} + func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { remoteUncacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -165,12 +169,12 @@ func (ff *FileFilter) matches(entry *filer_pb.Entry) bool { } } if *ff.minAge != -1 { - if entry.Attributes.Crtime + *ff.minAge > time.Now().Unix() { + if entry.Attributes.Crtime+*ff.minAge > time.Now().Unix() { return false } } if *ff.maxAge != -1 { - if entry.Attributes.Crtime + *ff.maxAge < time.Now().Unix() { + if entry.Attributes.Crtime+*ff.maxAge < time.Now().Unix() { return false } } diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go index f461b09de..6de92e612 100644 --- a/weed/shell/command_remote_unmount.go +++ b/weed/shell/command_remote_unmount.go @@ -37,6 +37,10 @@ func (c *commandRemoteUnmount) Help() string { ` } +func (c *commandRemoteUnmount) HasTag(CommandTag) bool { + return false +} + func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go index 80c8a043b..86c788b0d 100644 --- a/weed/shell/command_s3_bucket_create.go +++ b/weed/shell/command_s3_bucket_create.go @@ -30,6 +30,10 @@ func (c *commandS3BucketCreate) Help() string { ` } +func (c *commandS3BucketCreate) HasTag(CommandTag) bool { + return false +} + func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index 3b2a9c6e9..6f8c9c4b5 100644 --- a/weed/shell/command_s3_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -28,6 +28,10 @@ func (c *commandS3BucketDelete) Help() string { ` } +func (c *commandS3BucketDelete) HasTag(CommandTag) bool { + return false +} + func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go index bf21a3a29..7939f2086 100644 --- a/weed/shell/command_s3_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -27,6 +27,10 @@ func (c *commandS3BucketList) Help() string { ` } +func (c *commandS3BucketList) HasTag(CommandTag) bool { + return false +} + func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_s3_bucket_quota.go b/weed/shell/command_s3_bucket_quota.go index d603c8156..3301a8d0e 100644 --- a/weed/shell/command_s3_bucket_quota.go +++ b/weed/shell/command_s3_bucket_quota.go @@ -28,6 +28,10 @@ func (c *commandS3BucketQuota) Help() string { ` } +func (c *commandS3BucketQuota) HasTag(CommandTag) bool { + return false +} + func (c *commandS3BucketQuota) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go index b130e4fad..04708a0e7 100644 --- a/weed/shell/command_s3_bucket_quota_check.go +++ b/weed/shell/command_s3_bucket_quota_check.go @@ -29,6 +29,10 @@ func (c *commandS3BucketQuotaEnforce) Help() string { ` } +func (c *commandS3BucketQuotaEnforce) HasTag(CommandTag) bool { + return false +} + func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_s3_circuitbreaker.go b/weed/shell/command_s3_circuitbreaker.go index 2f326b079..5abc2d429 100644 --- a/weed/shell/command_s3_circuitbreaker.go +++ b/weed/shell/command_s3_circuitbreaker.go @@ -51,6 +51,10 @@ func (c *commandS3CircuitBreaker) Help() string { ` } +func (c *commandS3CircuitBreaker) HasTag(CommandTag) bool { + return false +} + func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { dir := s3_constants.CircuitBreakerConfigDir file := s3_constants.CircuitBreakerConfigFile diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go index accce60ba..aa296dd67 100644 --- a/weed/shell/command_s3_clean_uploads.go +++ b/weed/shell/command_s3_clean_uploads.go @@ -34,6 +34,10 @@ func (c *commandS3CleanUploads) Help() string { ` } +func (c *commandS3CleanUploads) HasTag(CommandTag) bool { + return false +} + func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"") diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index 33c5a4cfc..0726bd143 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -33,6 +33,10 @@ func (c *commandS3Configure) Help() string { ` } +func (c *commandS3Configure) HasTag(CommandTag) bool { + return false +} + func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { s3ConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index e9a483f41..bea6f33c3 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -64,6 +64,10 @@ func (c *commandVolumeBalance) Help() string { ` } +func (c *commandVolumeBalance) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 0e76f6ac9..f72dff243 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -5,6 +5,12 @@ import ( "context" "flag" "fmt" + "io" + "math" + "net/http" + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -13,11 +19,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "golang.org/x/exp/slices" "google.golang.org/grpc" - "io" - "math" - "net/http" - "sync" - "time" ) func init() { @@ -47,6 +48,10 @@ func (c *commandVolumeCheckDisk) Help() string { ` } +func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool { + return tag == ResourceHeavy +} + func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) { err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ @@ -141,23 +146,35 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) { continue } - slices.SortFunc(replicas, func(a, b *VolumeReplica) int { + // filter readonly replica + var writableReplicas []*VolumeReplica + for _, replica := range replicas { + if replica.info.ReadOnly { + fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + } else { + writableReplicas = append(writableReplicas, replica) + } + } + + slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int { return int(b.info.FileCount - a.info.FileCount) }) - for len(replicas) >= 2 { - a, b := replicas[0], replicas[1] - replicas = replicas[1:] - if a.info.ReadOnly || b.info.ReadOnly { - fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n", - a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) - continue - } + for len(writableReplicas) >= 2 { + a, b := writableReplicas[0], writableReplicas[1] if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) { + // always choose the larger volume to be the source + writableReplicas = append(replicas[:1], writableReplicas[2:]...) continue } if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } + // always choose the larger volume to be the source + if a.info.FileCount > b.info.FileCount { + writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) + } else { + writableReplicas = writableReplicas[1:] + } } } @@ -191,13 +208,15 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a } // find and make up the differences - if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { - return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err) + aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + if err1 != nil { + return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) } - if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { - return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) + if err2 != nil { + return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2) } - return + return aHasChanges, bHasChanges, nil } func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) { diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index a6acd6838..29d7ebac4 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -35,6 +35,10 @@ func (c *commandVolumeConfigureReplication) Help() string { ` } +func (c *commandVolumeConfigureReplication) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) { configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go index 59193f6bc..be00cb18f 100644 --- a/weed/shell/command_volume_copy.go +++ b/weed/shell/command_volume_copy.go @@ -31,6 +31,10 @@ func (c *commandVolumeCopy) Help() string { ` } +func (c *commandVolumeCopy) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volCopyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go index 159981c93..719420bcc 100644 --- a/weed/shell/command_volume_delete.go +++ b/weed/shell/command_volume_delete.go @@ -29,6 +29,10 @@ func (c *commandVolumeDelete) Help() string { ` } +func (c *commandVolumeDelete) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go index bb0a194e0..fc1d50e64 100644 --- a/weed/shell/command_volume_delete_empty.go +++ b/weed/shell/command_volume_delete_empty.go @@ -32,6 +32,10 @@ func (c *commandVolumeDeleteEmpty) Help() string { ` } +func (c *commandVolumeDeleteEmpty) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 074931f40..2c35b3bcd 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -55,6 +55,10 @@ func (c *commandVolumeFixReplication) Help() string { ` } +func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool { + return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas. +} + func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -99,7 +103,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, replica := replicas[0] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) switch { - case replicaPlacement.GetCopyCount() > len(replicas): + case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas): underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) case isMisplaced(replicas, replicaPlacement): misplacedVolumeIds = append(misplacedVolumeIds, vid) @@ -377,6 +381,27 @@ func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { }) } +func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool { + existingDataCenters, existingRacks, _ := countReplicas(replicas) + + if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) { + return false + } + if replicaPlacement.DiffRackCount+1 > len(existingRacks) { + return false + } + if replicaPlacement.SameRackCount > 0 { + foundSatisfyRack := false + for _, rackCount := range existingRacks { + if rackCount >= replicaPlacement.SameRackCount+1 { + foundSatisfyRack = true + } + } + return foundSatisfyRack + } + return true +} + /* if on an existing data node { return false diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go index 97f270306..5f2318c32 100644 --- a/weed/shell/command_volume_fix_replication_test.go +++ b/weed/shell/command_volume_fix_replication_test.go @@ -438,3 +438,90 @@ func TestPickingMisplacedVolumeToDelete(t *testing.T) { } } + +func TestSatisfyReplicaCurrentLocation(t *testing.T) { + + var tests = []testcase{ + { + name: "test 001", + replication: "001", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + expected: false, + }, + { + name: "test 010", + replication: "010", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + expected: true, + }, + { + name: "test 011", + replication: "011", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + }, + expected: true, + }, + { + name: "test 110", + replication: "110", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + }, + expected: true, + }, + { + name: "test 100", + replication: "100", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication) + if satisfyReplicaCurrentLocation(replicaPlacement, tt.replicas) != tt.expected { + t.Errorf("%s: expect %v %v %+v", + tt.name, tt.expected, tt.replication, tt.replicas) + } + }) + } +} diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index acb0ee5ad..30d3ecd11 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -60,7 +60,7 @@ func (c *commandVolumeFsck) Name() string { } func (c *commandVolumeFsck) Help() string { - return `check all volumes to find entries not used by the filer + return `check all volumes to find entries not used by the filer. It is optional and resource intensive. Important assumption!!! the system is all used by one filer. @@ -79,6 +79,10 @@ func (c *commandVolumeFsck) Help() string { ` } +func (c *commandVolumeFsck) HasTag(tag CommandTag) bool { + return tag == ResourceHeavy +} + func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -359,12 +363,12 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn needleVID := needle.VolumeId(volumeId) if isReadOnlyReplicas[volumeId] { - err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true) + err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true, false) if err != nil { return fmt.Errorf("mark volume %d read/write: %v", volumeId, err) } fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server) - defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false) + defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false, false) fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server) } diff --git a/weed/shell/command_volume_grow.go b/weed/shell/command_volume_grow.go index 026a59b7b..5b351e6ab 100644 --- a/weed/shell/command_volume_grow.go +++ b/weed/shell/command_volume_grow.go @@ -30,6 +30,10 @@ func (c *commandGrow) Help() string { ` } +func (c *commandGrow) HasTag(CommandTag) bool { + return false +} + func (c *commandGrow) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index 83e3a0d6f..1e5238f39 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -39,6 +39,10 @@ func (c *commandVolumeList) Help() string { ` } +func (c *commandVolumeList) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volumeListCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go index 1716cf9a6..9bc35e6c1 100644 --- a/weed/shell/command_volume_mark.go +++ b/weed/shell/command_volume_mark.go @@ -3,9 +3,10 @@ package shell import ( "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" "io" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -27,6 +28,10 @@ func (c *commandVolumeMark) Help() string { ` } +func (c *commandVolumeMark) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volMarkCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -52,5 +57,5 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io. volumeId := needle.VolumeId(*volumeIdInt) - return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable) + return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable, true) } diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index e4bad222e..0b62905de 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -33,6 +33,10 @@ func (c *commandVolumeMount) Help() string { ` } +func (c *commandVolumeMount) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 97eaf211e..cf9991695 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -4,12 +4,13 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/wdclient" "io" "log" "time" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" @@ -48,6 +49,10 @@ func (c *commandVolumeMove) Help() string { ` } +func (c *commandVolumeMove) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volMoveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -132,6 +137,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl shouldMarkWritable = true _, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), + Persist: false, }) return readonlyErr } @@ -197,7 +203,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour }) } -func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) { +func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable, persist bool) (err error) { return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if writable { _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ @@ -206,16 +212,17 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId } else { _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), + Persist: persist, }) } return err }) } -func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error { +func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error { for _, location := range locations { fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) - if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil { + if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil { return err } } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index bad695cd7..3b593d9be 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -46,6 +46,10 @@ func (c *commandVolumeServerEvacuate) Help() string { ` } +func (c *commandVolumeServerEvacuate) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go index d0dd023af..e7c979cad 100644 --- a/weed/shell/command_volume_server_leave.go +++ b/weed/shell/command_volume_server_leave.go @@ -35,6 +35,10 @@ func (c *commandVolumeServerLeave) Help() string { ` } +func (c *commandVolumeServerLeave) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index e60a74735..2878d16fe 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -41,6 +41,10 @@ func (c *commandVolumeTierDownload) Help() string { ` } +func (c *commandVolumeTierDownload) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index c6364757f..d5087f0ec 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -53,6 +53,10 @@ func (c *commandVolumeTierMove) Help() string { ` } +func (c *commandVolumeTierMove) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -234,14 +238,14 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i } // mark all replicas as read only - if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil { + if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } newAddress := pb.NewServerAddressFromDataNode(dst.dataNode) if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil { // mark all replicas as writable - if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil { + if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true, false); err != nil { glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err) } diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index cb805b0cf..5ef1c5209 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -4,10 +4,11 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" "io" "time" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -55,6 +56,10 @@ func (c *commandVolumeTierUpload) Help() string { ` } +func (c *commandVolumeTierUpload) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -102,7 +107,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str return fmt.Errorf("volume %d not found", vid) } - err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false) + err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false) if err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err) } diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index 4b079b944..ccb83bbae 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -34,6 +34,10 @@ func (c *commandVolumeUnmount) Help() string { ` } +func (c *commandVolumeUnmount) HasTag(CommandTag) bool { + return false +} + func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volUnmountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go index eb95e3d3d..e55bf1ec9 100644 --- a/weed/shell/command_volume_vacuum.go +++ b/weed/shell/command_volume_vacuum.go @@ -27,6 +27,10 @@ func (c *commandVacuum) Help() string { ` } +func (c *commandVacuum) HasTag(CommandTag) bool { + return false +} + func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) diff --git a/weed/shell/command_volume_vacuum_disable.go b/weed/shell/command_volume_vacuum_disable.go index ddae744e5..15897ddb0 100644 --- a/weed/shell/command_volume_vacuum_disable.go +++ b/weed/shell/command_volume_vacuum_disable.go @@ -26,6 +26,10 @@ func (c *commandDisableVacuum) Help() string { ` } +func (c *commandDisableVacuum) HasTag(CommandTag) bool { + return false +} + func (c *commandDisableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if err = commandEnv.confirmIsLocked(args); err != nil { diff --git a/weed/shell/command_volume_vacuum_enable.go b/weed/shell/command_volume_vacuum_enable.go index 03284c92f..b8f779b14 100644 --- a/weed/shell/command_volume_vacuum_enable.go +++ b/weed/shell/command_volume_vacuum_enable.go @@ -26,6 +26,10 @@ func (c *commandEnableVacuum) Help() string { ` } +func (c *commandEnableVacuum) HasTag(CommandTag) bool { + return false +} + func (c *commandEnableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { if err = commandEnv.confirmIsLocked(args); err != nil { diff --git a/weed/shell/commands.go b/weed/shell/commands.go index e6e582376..582ee560d 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -6,7 +6,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" - "io" "net/url" "strconv" "strings" @@ -38,16 +37,6 @@ type CommandEnv struct { locker *exclusive_locks.ExclusiveLocker } -type command interface { - Name() string - Help() string - Do([]string, *CommandEnv, io.Writer) error -} - -var ( - Commands = []command{} -) - func NewCommandEnv(options *ShellOptions) *CommandEnv { ce := &CommandEnv{ env: make(map[string]string), |
