diff options
Diffstat (limited to 'weed/command/master.go')
| -rw-r--r-- | weed/command/master.go | 60 |
1 files changed, 44 insertions, 16 deletions
diff --git a/weed/command/master.go b/weed/command/master.go index 9e45c5037..9587df055 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,9 +1,9 @@ package command import ( + "golang.org/x/exp/slices" "net/http" "os" - "sort" "strings" "time" @@ -48,6 +48,8 @@ type MasterOptions struct { metricsHttpPort *int heartbeatInterval *time.Duration electionTimeout *time.Duration + raftHashicorp *bool + raftBootstrap *bool } func init() { @@ -71,6 +73,8 @@ func init() { m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") + m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft") + m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") } var cmdMaster = &Command{ @@ -132,8 +136,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers) + masterPeers := make(map[string]pb.ServerAddress) + for _, peer := range peers { + masterPeers[string(peer)] = peer + } + r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) + ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers) listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port) glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0) @@ -144,20 +153,32 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { // start raftServer raftServerOption := &weed_server.RaftServerOption{ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"), - Peers: peers, + Peers: masterPeers, ServerAddr: myMasterAddress, DataDir: util.ResolvePath(*masterOption.metaFolder), Topo: ms.Topo, RaftResumeState: *masterOption.raftResumeState, HeartbeatInterval: *masterOption.heartbeatInterval, ElectionTimeout: *masterOption.electionTimeout, + RaftBootstrap: *m.raftBootstrap, } - raftServer, err := weed_server.NewRaftServer(raftServerOption) - if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + var raftServer *weed_server.RaftServer + var err error + if *m.raftHashicorp { + if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil { + glog.Fatalf("NewHashicorpRaftServer: %s", err) + } + } else { + raftServer, err = weed_server.NewRaftServer(raftServerOption) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + } } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + if *m.raftHashicorp { + r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET") + } // starting grpc server grpcPort := *masterOption.portGrpc grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0) @@ -166,7 +187,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) + if *m.raftHashicorp { + raftServer.TransportManager.Register(grpcS) + } else { + protobuf.RegisterRaftServer(grpcS, raftServer) + } reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort) if grpcLocalL != nil { @@ -174,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } go grpcS.Serve(grpcL) - go func() { - time.Sleep(1500 * time.Millisecond) - if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { - if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + timeSleep := 1500 * time.Millisecond + if !*m.raftHashicorp { + go func() { + time.Sleep(timeSleep) + if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { + if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { + raftServer.DoJoinCommand() + } } - } - }() + }() + } go ms.MasterClient.KeepConnectedToMaster() @@ -246,8 +274,8 @@ func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers strin } func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { - sort.Slice(peers, func(i, j int) bool { - return strings.Compare(string(peers[i]), string(peers[j])) < 0 + slices.SortFunc(peers, func(a, b pb.ServerAddress) bool { + return strings.Compare(string(a), string(b)) < 0 }) if len(peers) <= 0 { return true |
