diff options
| author | a <eddy@gfxlabs.io> | 2022-04-12 14:42:29 -0700 |
|---|---|---|
| committer | a <eddy@gfxlabs.io> | 2022-04-12 14:42:29 -0700 |
| commit | 846858fb436cc061c40c4f2565ed3682e3758596 (patch) | |
| tree | 28984dd19b8cdb4ddb41a4a4283cb4b6644b37a4 /weed/shell | |
| parent | 41d396edc4a8cdd586e9e58cab7b725c070ca685 (diff) | |
| parent | 42fea7e7d6ce0aca8474c9beba27d33f15bd4f49 (diff) | |
| download | seaweedfs-846858fb436cc061c40c4f2565ed3682e3758596.tar.xz seaweedfs-846858fb436cc061c40c4f2565ed3682e3758596.zip | |
merge master
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_cluster_check.go | 5 | ||||
| -rw-r--r-- | weed/shell/command_cluster_raft_add.go | 59 | ||||
| -rw-r--r-- | weed/shell/command_cluster_raft_ps.go | 51 | ||||
| -rw-r--r-- | weed/shell/command_cluster_raft_remove.go | 56 | ||||
| -rw-r--r-- | weed/shell/command_s3_clean_uploads.go | 18 |
5 files changed, 184 insertions, 5 deletions
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index f841bd224..c8fe110ba 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -48,7 +48,10 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""] hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"] - if !emptyDiskTypeFound && !hddDiskTypeFound || emptyDiskTypeDiskInfo.VolumeCount == 0 && hddDiskTypeDiskInfo.VolumeCount == 0 { + if !emptyDiskTypeFound && !hddDiskTypeFound { + return fmt.Errorf("Need to a hdd disk type!") + } + if emptyDiskTypeFound && emptyDiskTypeDiskInfo.VolumeCount == 0 || hddDiskTypeFound && hddDiskTypeDiskInfo.VolumeCount == 0 { return fmt.Errorf("Need to a hdd disk type!") } diff --git a/weed/shell/command_cluster_raft_add.go b/weed/shell/command_cluster_raft_add.go new file mode 100644 index 000000000..e5f3c41c9 --- /dev/null +++ b/weed/shell/command_cluster_raft_add.go @@ -0,0 +1,59 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandRaftServerAdd{}) +} + +type commandRaftServerAdd struct { +} + +func (c *commandRaftServerAdd) Name() string { + return "cluster.raft.add" +} + +func (c *commandRaftServerAdd) Help() string { + return `add a server to the raft cluster + + Example: + cluster.raft.add -id <server_name> -address <server_host:port> -voter +` +} + +func (c *commandRaftServerAdd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + serverId := raftServerAddCommand.String("id", "", "server id") + serverAddress := raftServerAddCommand.String("address", "", "server grpc address") + serverVoter := raftServerAddCommand.Bool("voter", true, "assign it a vote") + if err = raftServerAddCommand.Parse(args); err != nil { + return nil + } + + if *serverId == "" || *serverAddress == "" { + return fmt.Errorf("empty server id or address") + } + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftAddServer(context.Background(), &master_pb.RaftAddServerRequest{ + Id: *serverId, + Address: *serverAddress, + Voter: *serverVoter, + }) + if err != nil { + return fmt.Errorf("raft add server: %v", err) + } + println("added server", *serverId) + return nil + }) + + return err + +} diff --git a/weed/shell/command_cluster_raft_ps.go b/weed/shell/command_cluster_raft_ps.go new file mode 100644 index 000000000..ea868db06 --- /dev/null +++ b/weed/shell/command_cluster_raft_ps.go @@ -0,0 +1,51 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandRaftClusterPs{}) +} + +type commandRaftClusterPs struct { +} + +func (c *commandRaftClusterPs) Name() string { + return "cluster.raft.ps" +} + +func (c *commandRaftClusterPs) Help() string { + return `check current raft cluster status + + cluster.raft.ps +` +} + +func (c *commandRaftClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + raftClusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + if err = raftClusterPsCommand.Parse(args); err != nil { + return nil + } + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{}) + if err != nil { + return fmt.Errorf("raft list cluster: %v", err) + } + fmt.Fprintf(writer, "the raft cluster has %d servers\n", len(resp.ClusterServers)) + for _, server := range resp.ClusterServers { + fmt.Fprintf(writer, " * %s %s (%s)\n", server.Id, server.Address, server.Suffrage) + } + + return nil + }) + + return err + +} diff --git a/weed/shell/command_cluster_raft_remove.go b/weed/shell/command_cluster_raft_remove.go new file mode 100644 index 000000000..532a1469c --- /dev/null +++ b/weed/shell/command_cluster_raft_remove.go @@ -0,0 +1,56 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandRaftServerRemove{}) +} + +type commandRaftServerRemove struct { +} + +func (c *commandRaftServerRemove) Name() string { + return "cluster.raft.remove" +} + +func (c *commandRaftServerRemove) Help() string { + return `remove a server from the raft cluster + + Example: + cluster.raft.remove -id <server_name> +` +} + +func (c *commandRaftServerRemove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + serverId := raftServerAddCommand.String("id", "", "server id") + if err = raftServerAddCommand.Parse(args); err != nil { + return nil + } + + if *serverId == "" { + return fmt.Errorf("empty server id") + } + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: *serverId, + Force: true, + }) + if err != nil { + return fmt.Errorf("raft remove server: %v", err) + } + println("removed server", *serverId) + return nil + }) + + return err + +} diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go index 4f893df7a..a6dc8f574 100644 --- a/weed/shell/command_s3_clean_uploads.go +++ b/weed/shell/command_s3_clean_uploads.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "io" "math" @@ -39,6 +40,8 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer return nil } + signingKey := util.GetViper().GetString("jwt.signing.key") + var filerBucketsPath string filerBucketsPath, err = readFilerBucketsPath(commandEnv) if err != nil { @@ -55,14 +58,16 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer } for _, bucket := range buckets { - c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo) + if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil { + fmt.Fprintf(writer, fmt.Sprintf("failed cleanup uploads for backet %s: %v", bucket, err)) + } } return err } -func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error { +func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration, signingKey string) error { uploadsDir := filerBucketsPath + "/" + bucket + "/.uploads" var staleUploads []string now := time.Now() @@ -77,12 +82,17 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io return fmt.Errorf("list uploads under %v: %v", uploadsDir, err) } + var encodedJwt security.EncodedJwt + if signingKey != "" { + encodedJwt = security.GenJwtForFilerServer(security.SigningKey(signingKey), 15*60) + } + for _, staleUpload := range staleUploads { deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload) fmt.Fprintf(writer, "purge %s\n", deleteUrl) - err = util.Delete(deleteUrl, "") - if err != nil { + err = util.Delete(deleteUrl, string(encodedJwt)) + if err != nil && err.Error() != "" { return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err) } } |
