aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command.go20
-rw-r--r--weed/shell/command_cluster_check.go4
-rw-r--r--weed/shell/command_cluster_ps.go4
-rw-r--r--weed/shell/command_cluster_raft_add.go4
-rw-r--r--weed/shell/command_cluster_raft_ps.go4
-rw-r--r--weed/shell/command_cluster_raft_remove.go4
-rw-r--r--weed/shell/command_collection_delete.go4
-rw-r--r--weed/shell/command_collection_list.go4
-rw-r--r--weed/shell/command_ec_balance.go4
-rw-r--r--weed/shell/command_ec_decode.go4
-rw-r--r--weed/shell/command_ec_encode.go11
-rw-r--r--weed/shell/command_ec_rebuild.go4
-rw-r--r--weed/shell/command_fs_cat.go4
-rw-r--r--weed/shell/command_fs_cd.go4
-rw-r--r--weed/shell/command_fs_configure.go4
-rw-r--r--weed/shell/command_fs_du.go4
-rw-r--r--weed/shell/command_fs_log.go4
-rw-r--r--weed/shell/command_fs_ls.go4
-rw-r--r--weed/shell/command_fs_merge_volumes.go6
-rw-r--r--weed/shell/command_fs_meta_cat.go4
-rw-r--r--weed/shell/command_fs_meta_change_volume_id.go4
-rw-r--r--weed/shell/command_fs_meta_load.go4
-rw-r--r--weed/shell/command_fs_meta_notify.go4
-rw-r--r--weed/shell/command_fs_meta_save.go4
-rw-r--r--weed/shell/command_fs_mkdir.go4
-rw-r--r--weed/shell/command_fs_mv.go4
-rw-r--r--weed/shell/command_fs_pwd.go4
-rw-r--r--weed/shell/command_fs_rm.go4
-rw-r--r--weed/shell/command_fs_tree.go4
-rw-r--r--weed/shell/command_fs_verify.go4
-rw-r--r--weed/shell/command_lock_unlock.go8
-rw-r--r--weed/shell/command_mount_configure.go4
-rw-r--r--weed/shell/command_mq_balance.go4
-rw-r--r--weed/shell/command_mq_topic_configure.go4
-rw-r--r--weed/shell/command_mq_topic_desc.go4
-rw-r--r--weed/shell/command_mq_topic_list.go4
-rw-r--r--weed/shell/command_remote_cache.go4
-rw-r--r--weed/shell/command_remote_configure.go4
-rw-r--r--weed/shell/command_remote_meta_sync.go4
-rw-r--r--weed/shell/command_remote_mount.go4
-rw-r--r--weed/shell/command_remote_mount_buckets.go4
-rw-r--r--weed/shell/command_remote_uncache.go8
-rw-r--r--weed/shell/command_remote_unmount.go4
-rw-r--r--weed/shell/command_s3_bucket_create.go4
-rw-r--r--weed/shell/command_s3_bucket_delete.go4
-rw-r--r--weed/shell/command_s3_bucket_list.go4
-rw-r--r--weed/shell/command_s3_bucket_quota.go4
-rw-r--r--weed/shell/command_s3_bucket_quota_check.go4
-rw-r--r--weed/shell/command_s3_circuitbreaker.go4
-rw-r--r--weed/shell/command_s3_clean_uploads.go4
-rw-r--r--weed/shell/command_s3_configure.go4
-rw-r--r--weed/shell/command_volume_balance.go4
-rw-r--r--weed/shell/command_volume_check_disk.go57
-rw-r--r--weed/shell/command_volume_configure_replication.go4
-rw-r--r--weed/shell/command_volume_copy.go4
-rw-r--r--weed/shell/command_volume_delete.go4
-rw-r--r--weed/shell/command_volume_delete_empty.go4
-rw-r--r--weed/shell/command_volume_fix_replication.go27
-rw-r--r--weed/shell/command_volume_fix_replication_test.go87
-rw-r--r--weed/shell/command_volume_fsck.go10
-rw-r--r--weed/shell/command_volume_grow.go4
-rw-r--r--weed/shell/command_volume_list.go4
-rw-r--r--weed/shell/command_volume_mark.go9
-rw-r--r--weed/shell/command_volume_mount.go4
-rw-r--r--weed/shell/command_volume_move.go17
-rw-r--r--weed/shell/command_volume_server_evacuate.go4
-rw-r--r--weed/shell/command_volume_server_leave.go4
-rw-r--r--weed/shell/command_volume_tier_download.go4
-rw-r--r--weed/shell/command_volume_tier_move.go8
-rw-r--r--weed/shell/command_volume_tier_upload.go9
-rw-r--r--weed/shell/command_volume_unmount.go4
-rw-r--r--weed/shell/command_volume_vacuum.go4
-rw-r--r--weed/shell/command_volume_vacuum_disable.go4
-rw-r--r--weed/shell/command_volume_vacuum_enable.go4
-rw-r--r--weed/shell/commands.go11
75 files changed, 481 insertions, 51 deletions
diff --git a/weed/shell/command.go b/weed/shell/command.go
new file mode 100644
index 000000000..cfd994f3f
--- /dev/null
+++ b/weed/shell/command.go
@@ -0,0 +1,20 @@
+package shell
+
+import "io"
+
+type command interface {
+ Name() string
+ Help() string
+ Do([]string, *CommandEnv, io.Writer) error
+ HasTag(tag CommandTag) bool
+}
+
+var (
+ Commands = []command{}
+)
+
+type CommandTag string
+
+const (
+ ResourceHeavy CommandTag = "resourceHeavy"
+)
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go
index 5c9866d29..27a4f2bb3 100644
--- a/weed/shell/command_cluster_check.go
+++ b/weed/shell/command_cluster_check.go
@@ -32,6 +32,10 @@ func (c *commandClusterCheck) Help() string {
`
}
+func (c *commandClusterCheck) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go
index 22925da5b..5a1503612 100644
--- a/weed/shell/command_cluster_ps.go
+++ b/weed/shell/command_cluster_ps.go
@@ -35,6 +35,10 @@ func (c *commandClusterPs) Help() string {
`
}
+func (c *commandClusterPs) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_cluster_raft_add.go b/weed/shell/command_cluster_raft_add.go
index 6dce8d147..6089631b1 100644
--- a/weed/shell/command_cluster_raft_add.go
+++ b/weed/shell/command_cluster_raft_add.go
@@ -27,6 +27,10 @@ func (c *commandRaftServerAdd) Help() string {
`
}
+func (c *commandRaftServerAdd) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRaftServerAdd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_cluster_raft_ps.go b/weed/shell/command_cluster_raft_ps.go
index 58e7d7585..c8324f635 100644
--- a/weed/shell/command_cluster_raft_ps.go
+++ b/weed/shell/command_cluster_raft_ps.go
@@ -26,6 +26,10 @@ func (c *commandRaftClusterPs) Help() string {
`
}
+func (c *commandRaftClusterPs) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRaftClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftClusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_cluster_raft_remove.go b/weed/shell/command_cluster_raft_remove.go
index c885d145b..109125890 100644
--- a/weed/shell/command_cluster_raft_remove.go
+++ b/weed/shell/command_cluster_raft_remove.go
@@ -27,6 +27,10 @@ func (c *commandRaftServerRemove) Help() string {
`
}
+func (c *commandRaftServerRemove) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRaftServerRemove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index 936f35b46..0239d4c55 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -28,6 +28,10 @@ func (c *commandCollectionDelete) Help() string {
`
}
+func (c *commandCollectionDelete) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
colDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index c277d4028..32085f565 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -23,6 +23,10 @@ func (c *commandCollectionList) Help() string {
return `list all collections`
}
+func (c *commandCollectionList) HasTag(CommandTag) bool {
+ return false
+}
+
type CollectionInfo struct {
FileCount float64
DeleteCount float64
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 217e5750e..704027580 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -98,6 +98,10 @@ func (c *commandEcBalance) Help() string {
`
}
+func (c *commandEcBalance) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index aa0ca5045..02d0f316d 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -37,6 +37,10 @@ func (c *commandEcDecode) Help() string {
`
}
+func (c *commandEcDecode) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 16de2ce73..e41529174 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -4,13 +4,14 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"math/rand"
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -55,6 +56,10 @@ func (c *commandEcEncode) Help() string {
`
}
+func (c *commandEcEncode) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -125,7 +130,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
- err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false)
+ err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index a4dfac67c..b761ea676 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -55,6 +55,10 @@ func (c *commandEcRebuild) Help() string {
`
}
+func (c *commandEcRebuild) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index bdaa757f5..47a7f3be8 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -26,6 +26,10 @@ func (c *commandFsCat) Help() string {
`
}
+func (c *commandFsCat) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))
diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go
index 2cc28f7a2..698865142 100644
--- a/weed/shell/command_fs_cd.go
+++ b/weed/shell/command_fs_cd.go
@@ -28,6 +28,10 @@ func (c *commandFsCd) Help() string {
`
}
+func (c *commandFsCd) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index 99ef4a59f..b70eb88fc 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -46,6 +46,10 @@ func (c *commandFsConfigure) Help() string {
`
}
+func (c *commandFsConfigure) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index e27ff6f6c..c35179f88 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -29,6 +29,10 @@ func (c *commandFsDu) Help() string {
`
}
+func (c *commandFsDu) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))
diff --git a/weed/shell/command_fs_log.go b/weed/shell/command_fs_log.go
index 5567f76e6..137a218f0 100644
--- a/weed/shell/command_fs_log.go
+++ b/weed/shell/command_fs_log.go
@@ -27,6 +27,10 @@ func (c *commandFsLogPurge) Help() string {
`
}
+func (c *commandFsLogPurge) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsLogPurge) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsLogPurgeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
daysAgo := fsLogPurgeCommand.Uint("daysAgo", 365, "purge logs older than N days")
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
index 10764175b..e007b4c9e 100644
--- a/weed/shell/command_fs_ls.go
+++ b/weed/shell/command_fs_ls.go
@@ -33,6 +33,10 @@ func (c *commandFsLs) Help() string {
`
}
+func (c *commandFsLs) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var isLongFormat, showHidden bool
diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go
index b77feb8e3..eb401aab1 100644
--- a/weed/shell/command_fs_merge_volumes.go
+++ b/weed/shell/command_fs_merge_volumes.go
@@ -44,6 +44,10 @@ func (c *commandFsMergeVolumes) Help() string {
`
}
+func (c *commandFsMergeVolumes) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -322,7 +326,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
if err != nil {
return err
}
-
+
_, err, _ = uploader.Upload(reader, &operation.UploadOption{
UploadUrl: uploadURL,
Filename: filename,
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index 69eee6e01..e47d8faa6 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -30,6 +30,10 @@ func (c *commandFsMetaCat) Help() string {
`
}
+func (c *commandFsMetaCat) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))
diff --git a/weed/shell/command_fs_meta_change_volume_id.go b/weed/shell/command_fs_meta_change_volume_id.go
index 0d350644d..f1c148f5b 100644
--- a/weed/shell/command_fs_meta_change_volume_id.go
+++ b/weed/shell/command_fs_meta_change_volume_id.go
@@ -37,6 +37,10 @@ func (c *commandFsMetaChangeVolumeId) Help() string {
`
}
+func (c *commandFsMetaChangeVolumeId) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsMetaChangeVolumeIdCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 5f6559867..d56274362 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -38,6 +38,10 @@ func (c *commandFsMetaLoad) Help() string {
`
}
+func (c *commandFsMetaLoad) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) == 0 {
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go
index 5445810ed..d7aca21d3 100644
--- a/weed/shell/command_fs_meta_notify.go
+++ b/weed/shell/command_fs_meta_notify.go
@@ -30,6 +30,10 @@ func (c *commandFsMetaNotify) Help() string {
`
}
+func (c *commandFsMetaNotify) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index 6a2ea6e75..a8be9fe2c 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -44,6 +44,10 @@ func (c *commandFsMetaSave) Help() string {
`
}
+func (c *commandFsMetaSave) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go
index 30cd26a20..9c33aa81c 100644
--- a/weed/shell/command_fs_mkdir.go
+++ b/weed/shell/command_fs_mkdir.go
@@ -27,6 +27,10 @@ func (c *commandFsMkdir) Help() string {
`
}
+func (c *commandFsMkdir) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
index 8e609edc9..cb966571c 100644
--- a/weed/shell/command_fs_mv.go
+++ b/weed/shell/command_fs_mv.go
@@ -34,6 +34,10 @@ func (c *commandFsMv) Help() string {
`
}
+func (c *commandFsMv) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) != 2 {
diff --git a/weed/shell/command_fs_pwd.go b/weed/shell/command_fs_pwd.go
index d7d9819c8..e74fb6c3d 100644
--- a/weed/shell/command_fs_pwd.go
+++ b/weed/shell/command_fs_pwd.go
@@ -20,6 +20,10 @@ func (c *commandFsPwd) Help() string {
return `print out current directory`
}
+func (c *commandFsPwd) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory)
diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go
index b8445b7e9..0af75f048 100644
--- a/weed/shell/command_fs_rm.go
+++ b/weed/shell/command_fs_rm.go
@@ -34,6 +34,10 @@ func (c *commandFsRm) Help() string {
`
}
+func (c *commandFsRm) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
isRecursive := false
ignoreRecursiveError := false
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
index 2b2ad2cb7..21e352af2 100644
--- a/weed/shell/command_fs_tree.go
+++ b/weed/shell/command_fs_tree.go
@@ -28,6 +28,10 @@ func (c *commandFsTree) Help() string {
`
}
+func (c *commandFsTree) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))
diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go
index 9b0e18f94..ea9f86c3c 100644
--- a/weed/shell/command_fs_verify.go
+++ b/weed/shell/command_fs_verify.go
@@ -51,6 +51,10 @@ func (c *commandFsVerify) Help() string {
`
}
+func (c *commandFsVerify) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
c.env = commandEnv
c.writer = writer
diff --git a/weed/shell/command_lock_unlock.go b/weed/shell/command_lock_unlock.go
index a214ea91c..79f3c95b1 100644
--- a/weed/shell/command_lock_unlock.go
+++ b/weed/shell/command_lock_unlock.go
@@ -25,6 +25,10 @@ func (c *commandLock) Help() string {
`
}
+func (c *commandLock) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandLock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
commandEnv.locker.RequestLock(util.DetectedHostAddress())
@@ -47,6 +51,10 @@ func (c *commandUnlock) Help() string {
`
}
+func (c *commandUnlock) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandUnlock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
commandEnv.locker.ReleaseLock()
diff --git a/weed/shell/command_mount_configure.go b/weed/shell/command_mount_configure.go
index 941b7c797..5b224c39e 100644
--- a/weed/shell/command_mount_configure.go
+++ b/weed/shell/command_mount_configure.go
@@ -34,6 +34,10 @@ func (c *commandMountConfigure) Help() string {
`
}
+func (c *commandMountConfigure) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
mountConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_mq_balance.go b/weed/shell/command_mq_balance.go
index dbe218b54..2e61e640c 100644
--- a/weed/shell/command_mq_balance.go
+++ b/weed/shell/command_mq_balance.go
@@ -25,6 +25,10 @@ func (c *commandMqBalanceTopics) Help() string {
`
}
+func (c *commandMqBalanceTopics) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandMqBalanceTopics) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
// find the broker balancer
diff --git a/weed/shell/command_mq_topic_configure.go b/weed/shell/command_mq_topic_configure.go
index c5721d9d9..9342c8604 100644
--- a/weed/shell/command_mq_topic_configure.go
+++ b/weed/shell/command_mq_topic_configure.go
@@ -29,6 +29,10 @@ func (c *commandMqTopicConfigure) Help() string {
`
}
+func (c *commandMqTopicConfigure) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandMqTopicConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
// parse parameters
diff --git a/weed/shell/command_mq_topic_desc.go b/weed/shell/command_mq_topic_desc.go
index cedad6ed4..8c944271c 100644
--- a/weed/shell/command_mq_topic_desc.go
+++ b/weed/shell/command_mq_topic_desc.go
@@ -24,6 +24,10 @@ func (c *commandMqTopicDescribe) Help() string {
return `describe a topic`
}
+func (c *commandMqTopicDescribe) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandMqTopicDescribe) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
// parse parameters
mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_mq_topic_list.go b/weed/shell/command_mq_topic_list.go
index 0a193cb4e..8da86f4a6 100644
--- a/weed/shell/command_mq_topic_list.go
+++ b/weed/shell/command_mq_topic_list.go
@@ -25,6 +25,10 @@ func (c *commandMqTopicList) Help() string {
return `print out all topics`
}
+func (c *commandMqTopicList) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandMqTopicList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
brokerBalancer, err := findBrokerBalancer(commandEnv)
diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go
index 3a44d26e4..da19af0cf 100644
--- a/weed/shell/command_remote_cache.go
+++ b/weed/shell/command_remote_cache.go
@@ -46,6 +46,10 @@ func (c *commandRemoteCache) Help() string {
`
}
+func (c *commandRemoteCache) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
index 11f7696b2..dbc44c8bf 100644
--- a/weed/shell/command_remote_configure.go
+++ b/weed/shell/command_remote_configure.go
@@ -48,6 +48,10 @@ func (c *commandRemoteConfigure) Help() string {
`
}
+func (c *commandRemoteConfigure) HasTag(CommandTag) bool {
+ return false
+}
+
var (
isAlpha = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9]*$`).MatchString
)
diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go
index 3b6d92870..1b2e33c14 100644
--- a/weed/shell/command_remote_meta_sync.go
+++ b/weed/shell/command_remote_meta_sync.go
@@ -44,6 +44,10 @@ func (c *commandRemoteMetaSync) Help() string {
`
}
+func (c *commandRemoteMetaSync) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go
index 9dffd10eb..cfe89ddce 100644
--- a/weed/shell/command_remote_mount.go
+++ b/weed/shell/command_remote_mount.go
@@ -44,6 +44,10 @@ func (c *commandRemoteMount) Help() string {
`
}
+func (c *commandRemoteMount) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_remote_mount_buckets.go b/weed/shell/command_remote_mount_buckets.go
index 91375d2d2..d8df09e60 100644
--- a/weed/shell/command_remote_mount_buckets.go
+++ b/weed/shell/command_remote_mount_buckets.go
@@ -38,6 +38,10 @@ func (c *commandRemoteMountBuckets) Help() string {
`
}
+func (c *commandRemoteMountBuckets) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRemoteMountBuckets) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountBucketsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go
index 25e51ff74..86d992ef1 100644
--- a/weed/shell/command_remote_uncache.go
+++ b/weed/shell/command_remote_uncache.go
@@ -41,6 +41,10 @@ func (c *commandRemoteUncache) Help() string {
`
}
+func (c *commandRemoteUncache) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteUncacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -165,12 +169,12 @@ func (ff *FileFilter) matches(entry *filer_pb.Entry) bool {
}
}
if *ff.minAge != -1 {
- if entry.Attributes.Crtime + *ff.minAge > time.Now().Unix() {
+ if entry.Attributes.Crtime+*ff.minAge > time.Now().Unix() {
return false
}
}
if *ff.maxAge != -1 {
- if entry.Attributes.Crtime + *ff.maxAge < time.Now().Unix() {
+ if entry.Attributes.Crtime+*ff.maxAge < time.Now().Unix() {
return false
}
}
diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go
index f461b09de..6de92e612 100644
--- a/weed/shell/command_remote_unmount.go
+++ b/weed/shell/command_remote_unmount.go
@@ -37,6 +37,10 @@ func (c *commandRemoteUnmount) Help() string {
`
}
+func (c *commandRemoteUnmount) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go
index 80c8a043b..86c788b0d 100644
--- a/weed/shell/command_s3_bucket_create.go
+++ b/weed/shell/command_s3_bucket_create.go
@@ -30,6 +30,10 @@ func (c *commandS3BucketCreate) Help() string {
`
}
+func (c *commandS3BucketCreate) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index 3b2a9c6e9..6f8c9c4b5 100644
--- a/weed/shell/command_s3_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -28,6 +28,10 @@ func (c *commandS3BucketDelete) Help() string {
`
}
+func (c *commandS3BucketDelete) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go
index bf21a3a29..7939f2086 100644
--- a/weed/shell/command_s3_bucket_list.go
+++ b/weed/shell/command_s3_bucket_list.go
@@ -27,6 +27,10 @@ func (c *commandS3BucketList) Help() string {
`
}
+func (c *commandS3BucketList) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_s3_bucket_quota.go b/weed/shell/command_s3_bucket_quota.go
index d603c8156..3301a8d0e 100644
--- a/weed/shell/command_s3_bucket_quota.go
+++ b/weed/shell/command_s3_bucket_quota.go
@@ -28,6 +28,10 @@ func (c *commandS3BucketQuota) Help() string {
`
}
+func (c *commandS3BucketQuota) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3BucketQuota) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go
index b130e4fad..04708a0e7 100644
--- a/weed/shell/command_s3_bucket_quota_check.go
+++ b/weed/shell/command_s3_bucket_quota_check.go
@@ -29,6 +29,10 @@ func (c *commandS3BucketQuotaEnforce) Help() string {
`
}
+func (c *commandS3BucketQuotaEnforce) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_s3_circuitbreaker.go b/weed/shell/command_s3_circuitbreaker.go
index 2f326b079..5abc2d429 100644
--- a/weed/shell/command_s3_circuitbreaker.go
+++ b/weed/shell/command_s3_circuitbreaker.go
@@ -51,6 +51,10 @@ func (c *commandS3CircuitBreaker) Help() string {
`
}
+func (c *commandS3CircuitBreaker) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
dir := s3_constants.CircuitBreakerConfigDir
file := s3_constants.CircuitBreakerConfigFile
diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go
index accce60ba..aa296dd67 100644
--- a/weed/shell/command_s3_clean_uploads.go
+++ b/weed/shell/command_s3_clean_uploads.go
@@ -34,6 +34,10 @@ func (c *commandS3CleanUploads) Help() string {
`
}
+func (c *commandS3CleanUploads) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index 33c5a4cfc..0726bd143 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -33,6 +33,10 @@ func (c *commandS3Configure) Help() string {
`
}
+func (c *commandS3Configure) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
s3ConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index e9a483f41..bea6f33c3 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -64,6 +64,10 @@ func (c *commandVolumeBalance) Help() string {
`
}
+func (c *commandVolumeBalance) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 0e76f6ac9..f72dff243 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -5,6 +5,12 @@ import (
"context"
"flag"
"fmt"
+ "io"
+ "math"
+ "net/http"
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -13,11 +19,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
- "io"
- "math"
- "net/http"
- "sync"
- "time"
)
func init() {
@@ -47,6 +48,10 @@ func (c *commandVolumeCheckDisk) Help() string {
`
}
+func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool {
+ return tag == ResourceHeavy
+}
+
func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) {
err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
@@ -141,23 +146,35 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
continue
}
- slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
+ // filter readonly replica
+ var writableReplicas []*VolumeReplica
+ for _, replica := range replicas {
+ if replica.info.ReadOnly {
+ fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
+ } else {
+ writableReplicas = append(writableReplicas, replica)
+ }
+ }
+
+ slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int {
return int(b.info.FileCount - a.info.FileCount)
})
- for len(replicas) >= 2 {
- a, b := replicas[0], replicas[1]
- replicas = replicas[1:]
- if a.info.ReadOnly || b.info.ReadOnly {
- fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n",
- a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
- continue
- }
+ for len(writableReplicas) >= 2 {
+ a, b := writableReplicas[0], writableReplicas[1]
if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) {
+ // always choose the larger volume to be the source
+ writableReplicas = append(replicas[:1], writableReplicas[2:]...)
continue
}
if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil {
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
+ // always choose the larger volume to be the source
+ if a.info.FileCount > b.info.FileCount {
+ writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
+ } else {
+ writableReplicas = writableReplicas[1:]
+ }
}
}
@@ -191,13 +208,15 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
}
// find and make up the differences
- if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
- return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err)
+ aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
+ bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
+ if err1 != nil {
+ return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1)
}
- if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
- return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
+ if err2 != nil {
+ return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2)
}
- return
+ return aHasChanges, bHasChanges, nil
}
func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) {
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index a6acd6838..29d7ebac4 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -35,6 +35,10 @@ func (c *commandVolumeConfigureReplication) Help() string {
`
}
+func (c *commandVolumeConfigureReplication) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) {
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
index 59193f6bc..be00cb18f 100644
--- a/weed/shell/command_volume_copy.go
+++ b/weed/shell/command_volume_copy.go
@@ -31,6 +31,10 @@ func (c *commandVolumeCopy) Help() string {
`
}
+func (c *commandVolumeCopy) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volCopyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go
index 159981c93..719420bcc 100644
--- a/weed/shell/command_volume_delete.go
+++ b/weed/shell/command_volume_delete.go
@@ -29,6 +29,10 @@ func (c *commandVolumeDelete) Help() string {
`
}
+func (c *commandVolumeDelete) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go
index bb0a194e0..fc1d50e64 100644
--- a/weed/shell/command_volume_delete_empty.go
+++ b/weed/shell/command_volume_delete_empty.go
@@ -32,6 +32,10 @@ func (c *commandVolumeDeleteEmpty) Help() string {
`
}
+func (c *commandVolumeDeleteEmpty) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 074931f40..2c35b3bcd 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -55,6 +55,10 @@ func (c *commandVolumeFixReplication) Help() string {
`
}
+func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool {
+ return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas.
+}
+
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -99,7 +103,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
replica := replicas[0]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
switch {
- case replicaPlacement.GetCopyCount() > len(replicas):
+ case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
case isMisplaced(replicas, replicaPlacement):
misplacedVolumeIds = append(misplacedVolumeIds, vid)
@@ -377,6 +381,27 @@ func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
})
}
+func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool {
+ existingDataCenters, existingRacks, _ := countReplicas(replicas)
+
+ if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) {
+ return false
+ }
+ if replicaPlacement.DiffRackCount+1 > len(existingRacks) {
+ return false
+ }
+ if replicaPlacement.SameRackCount > 0 {
+ foundSatisfyRack := false
+ for _, rackCount := range existingRacks {
+ if rackCount >= replicaPlacement.SameRackCount+1 {
+ foundSatisfyRack = true
+ }
+ }
+ return foundSatisfyRack
+ }
+ return true
+}
+
/*
if on an existing data node {
return false
diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go
index 97f270306..5f2318c32 100644
--- a/weed/shell/command_volume_fix_replication_test.go
+++ b/weed/shell/command_volume_fix_replication_test.go
@@ -438,3 +438,90 @@ func TestPickingMisplacedVolumeToDelete(t *testing.T) {
}
}
+
+func TestSatisfyReplicaCurrentLocation(t *testing.T) {
+
+ var tests = []testcase{
+ {
+ name: "test 001",
+ replication: "001",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ expected: false,
+ },
+ {
+ name: "test 010",
+ replication: "010",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ expected: true,
+ },
+ {
+ name: "test 011",
+ replication: "011",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ },
+ expected: true,
+ },
+ {
+ name: "test 110",
+ replication: "110",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ {
+ location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ },
+ expected: true,
+ },
+ {
+ name: "test 100",
+ replication: "100",
+ replicas: []*VolumeReplica{
+ {
+ location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ {
+ location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ },
+ expected: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
+ if satisfyReplicaCurrentLocation(replicaPlacement, tt.replicas) != tt.expected {
+ t.Errorf("%s: expect %v %v %+v",
+ tt.name, tt.expected, tt.replication, tt.replicas)
+ }
+ })
+ }
+}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index acb0ee5ad..30d3ecd11 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -60,7 +60,7 @@ func (c *commandVolumeFsck) Name() string {
}
func (c *commandVolumeFsck) Help() string {
- return `check all volumes to find entries not used by the filer
+ return `check all volumes to find entries not used by the filer. It is optional and resource intensive.
Important assumption!!!
the system is all used by one filer.
@@ -79,6 +79,10 @@ func (c *commandVolumeFsck) Help() string {
`
}
+func (c *commandVolumeFsck) HasTag(tag CommandTag) bool {
+ return tag == ResourceHeavy
+}
+
func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -359,12 +363,12 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
needleVID := needle.VolumeId(volumeId)
if isReadOnlyReplicas[volumeId] {
- err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true)
+ err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true, false)
if err != nil {
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
}
fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
- defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false)
+ defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false, false)
fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
}
diff --git a/weed/shell/command_volume_grow.go b/weed/shell/command_volume_grow.go
index 026a59b7b..5b351e6ab 100644
--- a/weed/shell/command_volume_grow.go
+++ b/weed/shell/command_volume_grow.go
@@ -30,6 +30,10 @@ func (c *commandGrow) Help() string {
`
}
+func (c *commandGrow) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandGrow) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index 83e3a0d6f..1e5238f39 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -39,6 +39,10 @@ func (c *commandVolumeList) Help() string {
`
}
+func (c *commandVolumeList) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volumeListCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go
index 1716cf9a6..9bc35e6c1 100644
--- a/weed/shell/command_volume_mark.go
+++ b/weed/shell/command_volume_mark.go
@@ -3,9 +3,10 @@ package shell
import (
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"io"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@@ -27,6 +28,10 @@ func (c *commandVolumeMark) Help() string {
`
}
+func (c *commandVolumeMark) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volMarkCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -52,5 +57,5 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.
volumeId := needle.VolumeId(*volumeIdInt)
- return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable)
+ return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable, true)
}
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index e4bad222e..0b62905de 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -33,6 +33,10 @@ func (c *commandVolumeMount) Help() string {
`
}
+func (c *commandVolumeMount) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 97eaf211e..cf9991695 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
"io"
"log"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
+
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@@ -48,6 +49,10 @@ func (c *commandVolumeMove) Help() string {
`
}
+func (c *commandVolumeMove) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volMoveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -132,6 +137,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
shouldMarkWritable = true
_, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
+ Persist: false,
})
return readonlyErr
}
@@ -197,7 +203,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
})
}
-func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
+func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable, persist bool) (err error) {
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
@@ -206,16 +212,17 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
} else {
_, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
+ Persist: persist,
})
}
return err
})
}
-func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error {
+func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
- if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil {
+ if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
return err
}
}
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index bad695cd7..3b593d9be 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -46,6 +46,10 @@ func (c *commandVolumeServerEvacuate) Help() string {
`
}
+func (c *commandVolumeServerEvacuate) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go
index d0dd023af..e7c979cad 100644
--- a/weed/shell/command_volume_server_leave.go
+++ b/weed/shell/command_volume_server_leave.go
@@ -35,6 +35,10 @@ func (c *commandVolumeServerLeave) Help() string {
`
}
+func (c *commandVolumeServerLeave) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
index e60a74735..2878d16fe 100644
--- a/weed/shell/command_volume_tier_download.go
+++ b/weed/shell/command_volume_tier_download.go
@@ -41,6 +41,10 @@ func (c *commandVolumeTierDownload) Help() string {
`
}
+func (c *commandVolumeTierDownload) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go
index c6364757f..d5087f0ec 100644
--- a/weed/shell/command_volume_tier_move.go
+++ b/weed/shell/command_volume_tier_move.go
@@ -53,6 +53,10 @@ func (c *commandVolumeTierMove) Help() string {
`
}
+func (c *commandVolumeTierMove) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -234,14 +238,14 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
}
// mark all replicas as read only
- if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
+ if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable
- if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
+ if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true, false); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
}
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index cb805b0cf..5ef1c5209 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -4,10 +4,11 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -55,6 +56,10 @@ func (c *commandVolumeTierUpload) Help() string {
`
}
+func (c *commandVolumeTierUpload) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -102,7 +107,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return fmt.Errorf("volume %d not found", vid)
}
- err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false)
+ err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err)
}
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index 4b079b944..ccb83bbae 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -34,6 +34,10 @@ func (c *commandVolumeUnmount) Help() string {
`
}
+func (c *commandVolumeUnmount) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volUnmountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go
index eb95e3d3d..e55bf1ec9 100644
--- a/weed/shell/command_volume_vacuum.go
+++ b/weed/shell/command_volume_vacuum.go
@@ -27,6 +27,10 @@ func (c *commandVacuum) Help() string {
`
}
+func (c *commandVacuum) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
diff --git a/weed/shell/command_volume_vacuum_disable.go b/weed/shell/command_volume_vacuum_disable.go
index ddae744e5..15897ddb0 100644
--- a/weed/shell/command_volume_vacuum_disable.go
+++ b/weed/shell/command_volume_vacuum_disable.go
@@ -26,6 +26,10 @@ func (c *commandDisableVacuum) Help() string {
`
}
+func (c *commandDisableVacuum) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandDisableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(args); err != nil {
diff --git a/weed/shell/command_volume_vacuum_enable.go b/weed/shell/command_volume_vacuum_enable.go
index 03284c92f..b8f779b14 100644
--- a/weed/shell/command_volume_vacuum_enable.go
+++ b/weed/shell/command_volume_vacuum_enable.go
@@ -26,6 +26,10 @@ func (c *commandEnableVacuum) Help() string {
`
}
+func (c *commandEnableVacuum) HasTag(CommandTag) bool {
+ return false
+}
+
func (c *commandEnableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(args); err != nil {
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index e6e582376..582ee560d 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -6,7 +6,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
- "io"
"net/url"
"strconv"
"strings"
@@ -38,16 +37,6 @@ type CommandEnv struct {
locker *exclusive_locks.ExclusiveLocker
}
-type command interface {
- Name() string
- Help() string
- Do([]string, *CommandEnv, io.Writer) error
-}
-
-var (
- Commands = []command{}
-)
-
func NewCommandEnv(options *ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),