aboutsummaryrefslogtreecommitdiff
path: root/weed/command/master.go
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/command/master.go
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/command/master.go')
-rw-r--r--weed/command/master.go167
1 files changed, 130 insertions, 37 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index 4eb43ee09..ab8466d47 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,23 +1,27 @@
package command
import (
- "github.com/chrislusf/raft/protobuf"
- "github.com/gorilla/mux"
- "google.golang.org/grpc/reflection"
+ "fmt"
+ "golang.org/x/exp/slices"
"net/http"
"os"
- "sort"
- "strconv"
+ "path"
"strings"
"time"
+ "github.com/chrislusf/raft/protobuf"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/gorilla/mux"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc/reflection"
+
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -28,6 +32,7 @@ var (
type MasterOptions struct {
port *int
+ portGrpc *int
ip *string
ipBind *string
metaFolder *string
@@ -42,13 +47,19 @@ type MasterOptions struct {
metricsAddress *string
metricsIntervalSec *int
raftResumeState *bool
+ metricsHttpPort *int
+ heartbeatInterval *time.Duration
+ electionTimeout *time.Duration
+ raftHashicorp *bool
+ raftBootstrap *bool
}
func init() {
cmdMaster.Run = runMaster // break init cycle
m.port = cmdMaster.Flag.Int("port", 9333, "http listen port")
+ m.portGrpc = cmdMaster.Flag.Int("port.grpc", 0, "grpc listen port")
m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier")
- m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to")
+ m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
@@ -60,7 +71,12 @@ func init() {
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
+ m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
+ m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
+ m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
+ m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft")
+ m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
}
var cmdMaster = &Command{
@@ -103,6 +119,7 @@ func runMaster(cmd *Command, args []string) bool {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
}
+ go stats_collect.StartMetricsServer(*m.metricsHttpPort)
startMaster(m, masterWhiteList)
return true
@@ -112,65 +129,139 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
backend.LoadConfiguration(util.GetViper())
- myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
+ if *masterOption.portGrpc == 0 {
+ *masterOption.portGrpc = 10000 + *masterOption.port
+ }
+ if *masterOption.ipBind == "" {
+ *masterOption.ipBind = *masterOption.ip
+ }
+
+ myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
+
+ masterPeers := make(map[string]pb.ServerAddress)
+ for _, peer := range peers {
+ masterPeers[string(peer)] = peer
+ }
r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
- listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port)
+ ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers)
+ listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
- masterListener, e := util.NewListener(listeningAddress, 0)
+ masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
+
// start raftServer
- raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
- peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, *masterOption.raftResumeState)
- if raftServer == nil {
- glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ metaDir := path.Join(*masterOption.metaFolder, fmt.Sprintf("m%d", *masterOption.port))
+ raftServerOption := &weed_server.RaftServerOption{
+ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
+ Peers: masterPeers,
+ ServerAddr: myMasterAddress,
+ DataDir: util.ResolvePath(metaDir),
+ Topo: ms.Topo,
+ RaftResumeState: *masterOption.raftResumeState,
+ HeartbeatInterval: *masterOption.heartbeatInterval,
+ ElectionTimeout: *masterOption.electionTimeout,
+ RaftBootstrap: *m.raftBootstrap,
+ }
+ var raftServer *weed_server.RaftServer
+ var err error
+ if *m.raftHashicorp {
+ if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil {
+ glog.Fatalf("NewHashicorpRaftServer: %s", err)
+ }
+ } else {
+ raftServer, err = weed_server.NewRaftServer(raftServerOption)
+ if raftServer == nil {
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ }
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
+ if *m.raftHashicorp {
+ r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
+ }
// starting grpc server
- grpcPort := *masterOption.port + 10000
- grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0)
+ grpcPort := *masterOption.portGrpc
+ grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0)
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
- protobuf.RegisterRaftServer(grpcS, raftServer)
+ if *m.raftHashicorp {
+ raftServer.TransportManager.Register(grpcS)
+ } else {
+ protobuf.RegisterRaftServer(grpcS, raftServer)
+ }
reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
+ if grpcLocalL != nil {
+ go grpcS.Serve(grpcLocalL)
+ }
go grpcS.Serve(grpcL)
- go func() {
- time.Sleep(1500 * time.Millisecond)
- if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
- if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
- raftServer.DoJoinCommand()
+ timeSleep := 1500 * time.Millisecond
+ if !*m.raftHashicorp {
+ go func() {
+ time.Sleep(timeSleep)
+ if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
+ if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
+ raftServer.DoJoinCommand()
+ }
}
- }
- }()
+ }()
+ }
go ms.MasterClient.KeepConnectedToMaster()
// start http server
+ var (
+ clientCertFile,
+ certFile,
+ keyFile string
+ )
+ useTLS := false
+ useMTLS := false
+
+ if viper.GetString("https.master.key") != "" {
+ useTLS = true
+ certFile = viper.GetString("https.master.cert")
+ keyFile = viper.GetString("https.master.key")
+ }
+
+ if viper.GetString("https.master.ca") != "" {
+ useMTLS = true
+ clientCertFile = viper.GetString("https.master.ca")
+ }
+
httpS := &http.Server{Handler: r}
- go httpS.Serve(masterListener)
+ if masterLocalListner != nil {
+ go httpS.Serve(masterLocalListner)
+ }
+
+ if useMTLS {
+ httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile)
+ }
+
+ if useTLS {
+ go httpS.ServeTLS(masterListener, certFile, keyFile)
+ } else {
+ go httpS.Serve(masterListener)
+ }
select {}
}
-func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
+func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers string) (masterAddress pb.ServerAddress, cleanedPeers []pb.ServerAddress) {
glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers)
- masterAddress = masterIp + ":" + strconv.Itoa(masterPort)
- if peers != "" {
- cleanedPeers = strings.Split(peers, ",")
- }
+ masterAddress = pb.NewServerAddress(masterIp, masterPort, masterGrpcPort)
+ cleanedPeers = pb.ServerAddresses(peers).ToAddresses()
hasSelf := false
for _, peer := range cleanedPeers {
- if peer == masterAddress {
+ if peer.ToHttpAddress() == masterAddress.ToHttpAddress() {
hasSelf = true
break
}
@@ -180,13 +271,15 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
cleanedPeers = append(cleanedPeers, masterAddress)
}
if len(cleanedPeers)%2 == 0 {
- glog.Fatalf("Only odd number of masters are supported!")
+ glog.Fatalf("Only odd number of masters are supported: %+v", cleanedPeers)
}
return
}
-func isTheFirstOne(self string, peers []string) bool {
- sort.Strings(peers)
+func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
+ slices.SortFunc(peers, func(a, b pb.ServerAddress) bool {
+ return strings.Compare(string(a), string(b)) < 0
+ })
if len(peers) <= 0 {
return true
}
@@ -194,9 +287,9 @@ func isTheFirstOne(self string, peers []string) bool {
}
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
+ masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc)
return &weed_server.MasterOption{
- Host: *m.ip,
- Port: *m.port,
+ Master: masterAddress,
MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB),
VolumePreallocate: *m.volumePreallocate,