diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/command/master.go | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/command/master.go')
| -rw-r--r-- | weed/command/master.go | 167 |
1 files changed, 130 insertions, 37 deletions
diff --git a/weed/command/master.go b/weed/command/master.go index 4eb43ee09..ab8466d47 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,23 +1,27 @@ package command import ( - "github.com/chrislusf/raft/protobuf" - "github.com/gorilla/mux" - "google.golang.org/grpc/reflection" + "fmt" + "golang.org/x/exp/slices" "net/http" "os" - "sort" - "strconv" + "path" "strings" "time" + "github.com/chrislusf/raft/protobuf" + stats_collect "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/gorilla/mux" + "github.com/spf13/viper" + "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" + weed_server "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -28,6 +32,7 @@ var ( type MasterOptions struct { port *int + portGrpc *int ip *string ipBind *string metaFolder *string @@ -42,13 +47,19 @@ type MasterOptions struct { metricsAddress *string metricsIntervalSec *int raftResumeState *bool + metricsHttpPort *int + heartbeatInterval *time.Duration + electionTimeout *time.Duration + raftHashicorp *bool + raftBootstrap *bool } func init() { cmdMaster.Run = runMaster // break init cycle m.port = cmdMaster.Flag.Int("port", 9333, "http listen port") + m.portGrpc = cmdMaster.Flag.Int("port.grpc", 0, "grpc listen port") m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier") - m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to") + m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") @@ -60,7 +71,12 @@ func init() { m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") + m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") + m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") + m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft") + m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") } var cmdMaster = &Command{ @@ -103,6 +119,7 @@ func runMaster(cmd *Command, args []string) bool { glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") } + go stats_collect.StartMetricsServer(*m.metricsHttpPort) startMaster(m, masterWhiteList) return true @@ -112,65 +129,139 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { backend.LoadConfiguration(util.GetViper()) - myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) + if *masterOption.portGrpc == 0 { + *masterOption.portGrpc = 10000 + *masterOption.port + } + if *masterOption.ipBind == "" { + *masterOption.ipBind = *masterOption.ip + } + + myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers) + + masterPeers := make(map[string]pb.ServerAddress) + for _, peer := range peers { + masterPeers[string(peer)] = peer + } r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) - listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port) + ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers) + listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port) glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) - masterListener, e := util.NewListener(listeningAddress, 0) + masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } + // start raftServer - raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), - peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, *masterOption.raftResumeState) - if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + metaDir := path.Join(*masterOption.metaFolder, fmt.Sprintf("m%d", *masterOption.port)) + raftServerOption := &weed_server.RaftServerOption{ + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"), + Peers: masterPeers, + ServerAddr: myMasterAddress, + DataDir: util.ResolvePath(metaDir), + Topo: ms.Topo, + RaftResumeState: *masterOption.raftResumeState, + HeartbeatInterval: *masterOption.heartbeatInterval, + ElectionTimeout: *masterOption.electionTimeout, + RaftBootstrap: *m.raftBootstrap, + } + var raftServer *weed_server.RaftServer + var err error + if *m.raftHashicorp { + if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil { + glog.Fatalf("NewHashicorpRaftServer: %s", err) + } + } else { + raftServer, err = weed_server.NewRaftServer(raftServerOption) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + } } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + if *m.raftHashicorp { + r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET") + } // starting grpc server - grpcPort := *masterOption.port + 10000 - grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *masterOption.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) + if *m.raftHashicorp { + raftServer.TransportManager.Register(grpcS) + } else { + protobuf.RegisterRaftServer(grpcS, raftServer) + } reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } go grpcS.Serve(grpcL) - go func() { - time.Sleep(1500 * time.Millisecond) - if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { - if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + timeSleep := 1500 * time.Millisecond + if !*m.raftHashicorp { + go func() { + time.Sleep(timeSleep) + if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { + if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { + raftServer.DoJoinCommand() + } } - } - }() + }() + } go ms.MasterClient.KeepConnectedToMaster() // start http server + var ( + clientCertFile, + certFile, + keyFile string + ) + useTLS := false + useMTLS := false + + if viper.GetString("https.master.key") != "" { + useTLS = true + certFile = viper.GetString("https.master.cert") + keyFile = viper.GetString("https.master.key") + } + + if viper.GetString("https.master.ca") != "" { + useMTLS = true + clientCertFile = viper.GetString("https.master.ca") + } + httpS := &http.Server{Handler: r} - go httpS.Serve(masterListener) + if masterLocalListner != nil { + go httpS.Serve(masterLocalListner) + } + + if useMTLS { + httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile) + } + + if useTLS { + go httpS.ServeTLS(masterListener, certFile, keyFile) + } else { + go httpS.Serve(masterListener) + } select {} } -func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) { +func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers string) (masterAddress pb.ServerAddress, cleanedPeers []pb.ServerAddress) { glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers) - masterAddress = masterIp + ":" + strconv.Itoa(masterPort) - if peers != "" { - cleanedPeers = strings.Split(peers, ",") - } + masterAddress = pb.NewServerAddress(masterIp, masterPort, masterGrpcPort) + cleanedPeers = pb.ServerAddresses(peers).ToAddresses() hasSelf := false for _, peer := range cleanedPeers { - if peer == masterAddress { + if peer.ToHttpAddress() == masterAddress.ToHttpAddress() { hasSelf = true break } @@ -180,13 +271,15 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st cleanedPeers = append(cleanedPeers, masterAddress) } if len(cleanedPeers)%2 == 0 { - glog.Fatalf("Only odd number of masters are supported!") + glog.Fatalf("Only odd number of masters are supported: %+v", cleanedPeers) } return } -func isTheFirstOne(self string, peers []string) bool { - sort.Strings(peers) +func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { + slices.SortFunc(peers, func(a, b pb.ServerAddress) bool { + return strings.Compare(string(a), string(b)) < 0 + }) if len(peers) <= 0 { return true } @@ -194,9 +287,9 @@ func isTheFirstOne(self string, peers []string) bool { } func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { + masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc) return &weed_server.MasterOption{ - Host: *m.ip, - Port: *m.port, + Master: masterAddress, MetaFolder: *m.metaFolder, VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), VolumePreallocate: *m.volumePreallocate, |
