aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
authorguosj <515878133@qq.com>2022-04-19 09:25:32 +0800
committerguosj <515878133@qq.com>2022-04-19 09:25:32 +0800
commit82ee31965dd7a1ad2d348c7e9dadb254744bf9b0 (patch)
tree593eb933dffc877010c761b2c55ec6c73875e9a3 /weed/shell
parent5c9a3bb8cf68ed99acb53dd548c92b54744d7fd7 (diff)
parente6ebafc094dc0ce0e3b0a68d7735f52a544bc479 (diff)
downloadseaweedfs-82ee31965dd7a1ad2d348c7e9dadb254744bf9b0.tar.xz
seaweedfs-82ee31965dd7a1ad2d348c7e9dadb254744bf9b0.zip
Merge branch 'master' of https://github.com/chrislusf/seaweedfs into chrislusf-master
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_cluster_check.go248
-rw-r--r--weed/shell/command_cluster_raft_add.go59
-rw-r--r--weed/shell/command_cluster_raft_ps.go51
-rw-r--r--weed/shell/command_cluster_raft_remove.go56
-rw-r--r--weed/shell/command_ec_balance.go15
-rw-r--r--weed/shell/command_ec_common.go17
-rw-r--r--weed/shell/command_ec_encode.go2
-rw-r--r--weed/shell/command_fs_meta_cat.go15
-rw-r--r--weed/shell/command_mount_configure.go64
-rw-r--r--weed/shell/command_s3_clean_uploads.go18
-rw-r--r--weed/shell/command_volume_balance.go15
-rw-r--r--weed/shell/command_volume_check_disk.go6
-rw-r--r--weed/shell/command_volume_fix_replication.go10
-rw-r--r--weed/shell/command_volume_fsck.go20
-rw-r--r--weed/shell/command_volume_list.go18
-rw-r--r--weed/shell/command_volume_server_evacuate.go13
-rw-r--r--weed/shell/command_volume_vacuum.go6
-rw-r--r--weed/shell/commands.go2
-rw-r--r--weed/shell/shell_liner.go8
19 files changed, 571 insertions, 72 deletions
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go
new file mode 100644
index 000000000..616669b6d
--- /dev/null
+++ b/weed/shell/command_cluster_check.go
@@ -0,0 +1,248 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandClusterCheck{})
+}
+
+type commandClusterCheck struct {
+}
+
+func (c *commandClusterCheck) Name() string {
+ return "cluster.check"
+}
+
+func (c *commandClusterCheck) Help() string {
+ return `check current cluster network connectivity
+
+ cluster.check
+
+`
+}
+
+func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ if err = clusterPsCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ // collect topology information
+ topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(topologyInfo.DiskInfos))
+
+ emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""]
+ hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"]
+ if !emptyDiskTypeFound && !hddDiskTypeFound {
+ return fmt.Errorf("Need to a hdd disk type!")
+ }
+ if emptyDiskTypeFound && emptyDiskTypeDiskInfo.VolumeCount == 0 || hddDiskTypeFound && hddDiskTypeDiskInfo.VolumeCount == 0 {
+ return fmt.Errorf("Need to a hdd disk type!")
+ }
+
+ // collect filers
+ var filers []pb.ServerAddress
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
+ ClientType: cluster.FilerType,
+ })
+
+ for _, node := range resp.ClusterNodes {
+ filers = append(filers, pb.ServerAddress(node.Address))
+ }
+ return err
+ })
+ if err != nil {
+ return
+ }
+ fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers)
+
+ // collect volume servers
+ var volumeServers []pb.ServerAddress
+ t, _, err := collectTopologyInfo(commandEnv, 0)
+ if err != nil {
+ return err
+ }
+ for _, dc := range t.DataCenterInfos {
+ for _, r := range dc.RackInfos {
+ for _, dn := range r.DataNodeInfos {
+ volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn))
+ }
+ }
+ }
+ fmt.Fprintf(writer, "the cluster has %d volume servers: %+v\n", len(volumeServers), volumeServers)
+
+ // collect all masters
+ var masters []pb.ServerAddress
+ for _, master := range commandEnv.MasterClient.GetMasters() {
+ masters = append(masters, master)
+ }
+
+ // check from master to volume servers
+ for _, master := range masters {
+ for _, volumeServer := range volumeServers {
+ fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer))
+ err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
+ Target: string(volumeServer),
+ TargetType: cluster.VolumeServerType,
+ })
+ if err == nil {
+ printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
+ }
+ return err
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "%v\n", err)
+ }
+ }
+ }
+
+ // check between masters
+ for _, sourceMaster := range masters {
+ for _, targetMaster := range masters {
+ if sourceMaster == targetMaster {
+ continue
+ }
+ fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster))
+ err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
+ Target: string(targetMaster),
+ TargetType: cluster.MasterType,
+ })
+ if err == nil {
+ printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
+ }
+ return err
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "%v\n", err)
+ }
+ }
+ }
+
+ // check from volume servers to masters
+ for _, volumeServer := range volumeServers {
+ for _, master := range masters {
+ fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master))
+ err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
+ Target: string(master),
+ TargetType: cluster.MasterType,
+ })
+ if err == nil {
+ printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
+ }
+ return err
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "%v\n", err)
+ }
+ }
+ }
+
+ // check from filers to masters
+ for _, filer := range filers {
+ for _, master := range masters {
+ fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master))
+ err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
+ Target: string(master),
+ TargetType: cluster.MasterType,
+ })
+ if err == nil {
+ printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
+ }
+ return err
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "%v\n", err)
+ }
+ }
+ }
+
+ // check from filers to volume servers
+ for _, filer := range filers {
+ for _, volumeServer := range volumeServers {
+ fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer))
+ err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
+ Target: string(volumeServer),
+ TargetType: cluster.VolumeServerType,
+ })
+ if err == nil {
+ printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
+ }
+ return err
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "%v\n", err)
+ }
+ }
+ }
+
+ // check between volume servers
+ for _, sourceVolumeServer := range volumeServers {
+ for _, targetVolumeServer := range volumeServers {
+ if sourceVolumeServer == targetVolumeServer {
+ continue
+ }
+ fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
+ err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
+ Target: string(targetVolumeServer),
+ TargetType: cluster.VolumeServerType,
+ })
+ if err == nil {
+ printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
+ }
+ return err
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "%v\n", err)
+ }
+ }
+ }
+
+ // check between filers, and need to connect to itself
+ for _, sourceFiler := range filers {
+ for _, targetFiler := range filers {
+ fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler))
+ err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
+ Target: string(targetFiler),
+ TargetType: cluster.FilerType,
+ })
+ if err == nil {
+ printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
+ }
+ return err
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "%v\n", err)
+ }
+ }
+ }
+
+ return nil
+}
+
+func printTiming(writer io.Writer, startNs, remoteNs, stopNs int64) {
+ roundTripTimeMs := float32(stopNs-startNs) / 1000000
+ deltaTimeMs := float32(remoteNs-(startNs+stopNs)/2) / 1000000
+ fmt.Fprintf(writer, "ok round trip %.3fms clock delta %.3fms\n", roundTripTimeMs, deltaTimeMs)
+}
diff --git a/weed/shell/command_cluster_raft_add.go b/weed/shell/command_cluster_raft_add.go
new file mode 100644
index 000000000..e5f3c41c9
--- /dev/null
+++ b/weed/shell/command_cluster_raft_add.go
@@ -0,0 +1,59 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandRaftServerAdd{})
+}
+
+type commandRaftServerAdd struct {
+}
+
+func (c *commandRaftServerAdd) Name() string {
+ return "cluster.raft.add"
+}
+
+func (c *commandRaftServerAdd) Help() string {
+ return `add a server to the raft cluster
+
+ Example:
+ cluster.raft.add -id <server_name> -address <server_host:port> -voter
+`
+}
+
+func (c *commandRaftServerAdd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ serverId := raftServerAddCommand.String("id", "", "server id")
+ serverAddress := raftServerAddCommand.String("address", "", "server grpc address")
+ serverVoter := raftServerAddCommand.Bool("voter", true, "assign it a vote")
+ if err = raftServerAddCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *serverId == "" || *serverAddress == "" {
+ return fmt.Errorf("empty server id or address")
+ }
+
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftAddServer(context.Background(), &master_pb.RaftAddServerRequest{
+ Id: *serverId,
+ Address: *serverAddress,
+ Voter: *serverVoter,
+ })
+ if err != nil {
+ return fmt.Errorf("raft add server: %v", err)
+ }
+ println("added server", *serverId)
+ return nil
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_cluster_raft_ps.go b/weed/shell/command_cluster_raft_ps.go
new file mode 100644
index 000000000..ea868db06
--- /dev/null
+++ b/weed/shell/command_cluster_raft_ps.go
@@ -0,0 +1,51 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandRaftClusterPs{})
+}
+
+type commandRaftClusterPs struct {
+}
+
+func (c *commandRaftClusterPs) Name() string {
+ return "cluster.raft.ps"
+}
+
+func (c *commandRaftClusterPs) Help() string {
+ return `check current raft cluster status
+
+ cluster.raft.ps
+`
+}
+
+func (c *commandRaftClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ raftClusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ if err = raftClusterPsCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{})
+ if err != nil {
+ return fmt.Errorf("raft list cluster: %v", err)
+ }
+ fmt.Fprintf(writer, "the raft cluster has %d servers\n", len(resp.ClusterServers))
+ for _, server := range resp.ClusterServers {
+ fmt.Fprintf(writer, " * %s %s (%s)\n", server.Id, server.Address, server.Suffrage)
+ }
+
+ return nil
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_cluster_raft_remove.go b/weed/shell/command_cluster_raft_remove.go
new file mode 100644
index 000000000..532a1469c
--- /dev/null
+++ b/weed/shell/command_cluster_raft_remove.go
@@ -0,0 +1,56 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandRaftServerRemove{})
+}
+
+type commandRaftServerRemove struct {
+}
+
+func (c *commandRaftServerRemove) Name() string {
+ return "cluster.raft.remove"
+}
+
+func (c *commandRaftServerRemove) Help() string {
+ return `remove a server from the raft cluster
+
+ Example:
+ cluster.raft.remove -id <server_name>
+`
+}
+
+func (c *commandRaftServerRemove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ serverId := raftServerAddCommand.String("id", "", "server id")
+ if err = raftServerAddCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *serverId == "" {
+ return fmt.Errorf("empty server id")
+ }
+
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: *serverId,
+ Force: true,
+ })
+ if err != nil {
+ return fmt.Errorf("raft remove server: %v", err)
+ }
+ println("removed server", *serverId)
+ return nil
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 6cd91119b..393d44b80 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -4,12 +4,11 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
- "io"
- "sort"
-
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "golang.org/x/exp/slices"
+ "io"
)
func init() {
@@ -411,8 +410,8 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
hasMove := true
for hasMove {
hasMove = false
- sort.Slice(rackEcNodes, func(i, j int) bool {
- return rackEcNodes[i].freeEcSlot > rackEcNodes[j].freeEcSlot
+ slices.SortFunc(rackEcNodes, func(a, b *EcNode) bool {
+ return a.freeEcSlot > b.freeEcSlot
})
emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
@@ -492,8 +491,8 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
})
}
}
- sort.Slice(candidateEcNodes, func(i, j int) bool {
- return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
+ slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) bool {
+ return a.shardCount > b.shardCount
})
for i := 0; i < n; i++ {
selectedEcNodeIndex := -1
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index b3bd0ce5d..27b650731 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -3,18 +3,17 @@ package shell
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
- "math"
- "sort"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "golang.org/x/exp/slices"
"google.golang.org/grpc"
+ "math"
)
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
@@ -116,14 +115,14 @@ func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId,
}
func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
- sort.Slice(ecNodes, func(i, j int) bool {
- return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
+ slices.SortFunc(ecNodes, func(a, b *EcNode) bool {
+ return a.freeEcSlot > b.freeEcSlot
})
}
func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
- sort.Slice(ecNodes, func(i, j int) bool {
- return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
+ slices.SortFunc(ecNodes, func(a, b *EcNode) bool {
+ return a.freeEcSlot < b.freeEcSlot
})
}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 9ae9b049c..251448908 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -95,7 +95,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
- if !found {
+ if !found && len(locations) > 0 {
return fmt.Errorf("volume %d not found", vid)
}
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index a7de6d3ef..4616c072d 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -2,11 +2,10 @@ package shell
import (
"fmt"
+ "github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
+ "golang.org/x/exp/slices"
"io"
- "sort"
-
- "github.com/golang/protobuf/jsonpb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -55,14 +54,12 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
EmitDefaults: true,
Indent: " ",
}
-
- sort.Slice(respLookupEntry.Entry.Chunks, func(i, j int) bool {
- if respLookupEntry.Entry.Chunks[i].Offset == respLookupEntry.Entry.Chunks[j].Offset {
- return respLookupEntry.Entry.Chunks[i].Mtime < respLookupEntry.Entry.Chunks[j].Mtime
+ slices.SortFunc(respLookupEntry.Entry.Chunks, func(a, b *filer_pb.FileChunk) bool {
+ if a.Offset == b.Offset {
+ return a.Mtime < b.Mtime
}
- return respLookupEntry.Entry.Chunks[i].Offset < respLookupEntry.Entry.Chunks[j].Offset
+ return a.Offset < b.Offset
})
-
text, marshalErr := m.MarshalToString(respLookupEntry.Entry)
if marshalErr != nil {
return fmt.Errorf("marshal meta: %v", marshalErr)
diff --git a/weed/shell/command_mount_configure.go b/weed/shell/command_mount_configure.go
new file mode 100644
index 000000000..8c268d35c
--- /dev/null
+++ b/weed/shell/command_mount_configure.go
@@ -0,0 +1,64 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ _ "google.golang.org/grpc/resolver/passthrough"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandMountConfigure{})
+}
+
+type commandMountConfigure struct {
+}
+
+func (c *commandMountConfigure) Name() string {
+ return "mount.configure"
+}
+
+func (c *commandMountConfigure) Help() string {
+ return `configure the mount on current server
+
+ mount.configure -dir=<mount_directory>
+
+ This command connects with local mount via unix socket, so it can only run locally.
+ The "mount_directory" value needs to be exactly the same as how mount was started in "weed mount -dir=<mount_directory>"
+
+`
+}
+
+func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ mountConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ mountDir := mountConfigureCommand.String("dir", "", "the mount directory same as how \"weed mount -dir=<mount_directory>\" was started")
+ mountQuota := mountConfigureCommand.Int("quotaMB", 0, "the quota in MB")
+ if err = mountConfigureCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ mountDirHash := util.HashToInt32([]byte(*mountDir))
+ if mountDirHash < 0 {
+ mountDirHash = -mountDirHash
+ }
+ localSocket := fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash)
+
+ clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ return
+ }
+ defer clientConn.Close()
+
+ client := mount_pb.NewSeaweedMountClient(clientConn)
+ _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
+ CollectionCapacity: int64(*mountQuota) * 1024 * 1024,
+ })
+
+ return
+}
diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go
index 4f893df7a..a6dc8f574 100644
--- a/weed/shell/command_s3_clean_uploads.go
+++ b/weed/shell/command_s3_clean_uploads.go
@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"math"
@@ -39,6 +40,8 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
+ signingKey := util.GetViper().GetString("jwt.signing.key")
+
var filerBucketsPath string
filerBucketsPath, err = readFilerBucketsPath(commandEnv)
if err != nil {
@@ -55,14 +58,16 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer
}
for _, bucket := range buckets {
- c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo)
+ if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil {
+ fmt.Fprintf(writer, fmt.Sprintf("failed cleanup uploads for backet %s: %v", bucket, err))
+ }
}
return err
}
-func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error {
+func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration, signingKey string) error {
uploadsDir := filerBucketsPath + "/" + bucket + "/.uploads"
var staleUploads []string
now := time.Now()
@@ -77,12 +82,17 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io
return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
}
+ var encodedJwt security.EncodedJwt
+ if signingKey != "" {
+ encodedJwt = security.GenJwtForFilerServer(security.SigningKey(signingKey), 15*60)
+ }
+
for _, staleUpload := range staleUploads {
deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
- err = util.Delete(deleteUrl, "")
- if err != nil {
+ err = util.Delete(deleteUrl, string(encodedJwt))
+ if err != nil && err.Error() != "" {
return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
}
}
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index 7a983de1a..b01d348c5 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -6,9 +6,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "golang.org/x/exp/slices"
"io"
"os"
- "sort"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -224,14 +224,14 @@ func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool
}
func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
- sort.Slice(volumes, func(i, j int) bool {
- return volumes[i].Size < volumes[j].Size
+ slices.SortFunc(volumes, func(a, b *master_pb.VolumeInformationMessage) bool {
+ return a.Size < b.Size
})
}
func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
- sort.Slice(volumes, func(i, j int) bool {
- return volumes[i].Id < volumes[j].Id
+ slices.SortFunc(volumes, func(a, b *master_pb.VolumeInformationMessage) bool {
+ return a.Id < b.Id
})
}
@@ -255,10 +255,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
for hasMoved {
hasMoved = false
- sort.Slice(nodesWithCapacity, func(i, j int) bool {
- return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc)
+ slices.SortFunc(nodesWithCapacity, func(a, b *Node) bool {
+ return a.localVolumeRatio(capacityFunc) < b.localVolumeRatio(capacityFunc)
})
-
fullNode := nodesWithCapacity[len(nodesWithCapacity)-1]
var candidateVolumes []*master_pb.VolumeInformationMessage
for _, v := range fullNode.selectedVolumes {
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 54edd53dd..53284096d 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -9,9 +9,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "golang.org/x/exp/slices"
"io"
"math"
- "sort"
)
func init() {
@@ -70,8 +70,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
}
for _, replicas := range volumeReplicas {
- sort.Slice(replicas, func(i, j int) bool {
- return fileCount(replicas[i]) > fileCount(replicas[j])
+ slices.SortFunc(replicas, func(a, b *VolumeReplica) bool {
+ return fileCount(a) > fileCount(b)
})
for len(replicas) >= 2 {
a, b := replicas[0], replicas[1]
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 78285d8a5..c4bef5925 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -7,9 +7,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "golang.org/x/exp/slices"
"io"
"path/filepath"
- "sort"
"strconv"
"time"
@@ -308,8 +308,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
fn := capacityByFreeVolumeCount(diskType)
- sort.Slice(dataNodes, func(i, j int) bool {
- return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode)
+ slices.SortFunc(dataNodes, func(a, b location) bool {
+ return fn(a.dataNode) > fn(b.dataNode)
})
}
@@ -488,9 +488,7 @@ func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[st
}
func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
-
- sort.Slice(replicas, func(i, j int) bool {
- a, b := replicas[i], replicas[j]
+ slices.SortFunc(replicas, func(a, b *VolumeReplica) bool {
if a.info.Size != b.info.Size {
return a.info.Size < b.info.Size
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 1b3d7bf0d..7d3aa28a5 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -12,6 +12,7 @@ import (
"net/url"
"os"
"path/filepath"
+ "strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -92,8 +93,27 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("failed to collect all volume locations: %v", err)
}
+ isBucketsPath := false
+ var fillerBucketsPath string
+ if *findMissingChunksInFiler && *findMissingChunksInFilerPath != "" {
+ fillerBucketsPath, err = readFilerBucketsPath(commandEnv)
+ if err != nil {
+ return fmt.Errorf("read filer buckets path: %v", err)
+ }
+ if strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath) {
+ isBucketsPath = true
+ }
+ }
+ if err != nil {
+ return fmt.Errorf("read filer buckets path: %v", err)
+ }
+
// collect each volume file ids
for volumeId, vinfo := range volumeIdToVInfo {
+ if isBucketsPath && !strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath+"/"+vinfo.collection) {
+ delete(volumeIdToVInfo, volumeId)
+ continue
+ }
err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index 4c0429ecb..9150752d5 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -6,9 +6,9 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "golang.org/x/exp/slices"
"io"
- "sort"
)
func init() {
@@ -67,8 +67,8 @@ func diskInfoToString(diskInfo *master_pb.DiskInfo) string {
func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64, verbosityLevel int) statistics {
output(verbosityLevel >= 0, writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(t.DiskInfos))
- sort.Slice(t.DataCenterInfos, func(i, j int) bool {
- return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id
+ slices.SortFunc(t.DataCenterInfos, func(a, b *master_pb.DataCenterInfo) bool {
+ return a.Id < b.Id
})
var s statistics
for _, dc := range t.DataCenterInfos {
@@ -80,8 +80,8 @@ func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLi
func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo, verbosityLevel int) statistics {
output(verbosityLevel >= 1, writer, " DataCenter %s%s\n", t.Id, diskInfosToString(t.DiskInfos))
var s statistics
- sort.Slice(t.RackInfos, func(i, j int) bool {
- return t.RackInfos[i].Id < t.RackInfos[j].Id
+ slices.SortFunc(t.RackInfos, func(a, b *master_pb.RackInfo) bool {
+ return a.Id < b.Id
})
for _, r := range t.RackInfos {
s = s.plus(writeRackInfo(writer, r, verbosityLevel))
@@ -92,8 +92,8 @@ func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo, verbosit
func writeRackInfo(writer io.Writer, t *master_pb.RackInfo, verbosityLevel int) statistics {
output(verbosityLevel >= 2, writer, " Rack %s%s\n", t.Id, diskInfosToString(t.DiskInfos))
var s statistics
- sort.Slice(t.DataNodeInfos, func(i, j int) bool {
- return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id
+ slices.SortFunc(t.DataNodeInfos, func(a, b *master_pb.DataNodeInfo) bool {
+ return a.Id < b.Id
})
for _, dn := range t.DataNodeInfos {
s = s.plus(writeDataNodeInfo(writer, dn, verbosityLevel))
@@ -118,8 +118,8 @@ func writeDiskInfo(writer io.Writer, t *master_pb.DiskInfo, verbosityLevel int)
diskType = "hdd"
}
output(verbosityLevel >= 4, writer, " Disk %s(%s)\n", diskType, diskInfoToString(t))
- sort.Slice(t.VolumeInfos, func(i, j int) bool {
- return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id
+ slices.SortFunc(t.VolumeInfos, func(a, b *master_pb.VolumeInformationMessage) bool {
+ return a.Id < b.Id
})
for _, vi := range t.VolumeInfos {
s = s.plus(writeVolumeInformationMessage(writer, vi, verbosityLevel))
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index 31ebcfec1..f07ea4b79 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -8,9 +8,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "golang.org/x/exp/slices"
"io"
"os"
- "sort"
)
func init() {
@@ -153,11 +153,9 @@ func evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyI
func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
-
- sort.Slice(otherNodes, func(i, j int) bool {
- return otherNodes[i].localShardIdCount(ecShardInfo.Id) < otherNodes[j].localShardIdCount(ecShardInfo.Id)
+ slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
+ return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
})
-
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
collectionPrefix := ""
@@ -188,10 +186,9 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][
return v.DiskType == vol.DiskType
})
}
- sort.Slice(otherNodes, func(i, j int) bool {
- return otherNodes[i].localVolumeRatio(fn) > otherNodes[j].localVolumeRatio(fn)
+ slices.SortFunc(otherNodes, func(a, b *Node) bool {
+ return a.localVolumeRatio(fn) > b.localVolumeRatio(fn)
})
-
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go
index 61b1f06fa..fc454c9ff 100644
--- a/weed/shell/command_volume_vacuum.go
+++ b/weed/shell/command_volume_vacuum.go
@@ -22,7 +22,7 @@ func (c *commandVacuum) Name() string {
func (c *commandVacuum) Help() string {
return `compact volumes if deleted entries are more than the limit
- volume.vacuum [-garbageThreshold=0.3]
+ volume.vacuum [-garbageThreshold=0.3] [-collection=<collection name>] [-volumeId=<volume id>]
`
}
@@ -31,6 +31,8 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ
volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
garbageThreshold := volumeVacuumCommand.Float64("garbageThreshold", 0.3, "vacuum when garbage is more than this limit")
+ collection := volumeVacuumCommand.String("collection", "", "vacuum this collection")
+ volumeId := volumeVacuumCommand.Uint("volumeId", 0, "the volume id")
if err = volumeVacuumCommand.Parse(args); err != nil {
return
}
@@ -42,6 +44,8 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
GarbageThreshold: float32(*garbageThreshold),
+ VolumeId: uint32(*volumeId),
+ Collection: *collection,
})
return err
})
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index ec71edee0..3ff49f1d2 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -46,7 +46,7 @@ var (
func NewCommandEnv(options *ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()),
option: options,
}
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin")
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index 90ce2dbb4..94a68f5bc 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -8,12 +8,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util/grace"
+ "golang.org/x/exp/slices"
"io"
"math/rand"
"os"
"path"
"regexp"
- "sort"
"strings"
"github.com/peterh/liner"
@@ -25,11 +25,9 @@ var (
)
func RunShell(options ShellOptions) {
-
- sort.Slice(Commands, func(i, j int) bool {
- return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0
+ slices.SortFunc(Commands, func(a, b command) bool {
+ return strings.Compare(a.Name(), b.Name()) < 0
})
-
line = liner.NewLiner()
defer line.Close()
grace.OnInterrupt(func() {