diff options
| author | garenchan <garenchan23@gmail.com> | 2022-02-12 18:19:49 +0800 |
|---|---|---|
| committer | garenchan <garenchan23@gmail.com> | 2022-02-14 21:09:07 +0800 |
| commit | bd032eabe7def055e617440513546819b5b3d88c (patch) | |
| tree | 33d3e01fa4ee1f2543095bea1622e1b9f5a706ba /weed/server/raft_server.go | |
| parent | 0c1f42f4eba8b00c6599dfec52413a18e25fea2e (diff) | |
| download | seaweedfs-bd032eabe7def055e617440513546819b5b3d88c.tar.xz seaweedfs-bd032eabe7def055e617440513546819b5b3d88c.zip | |
[UPDATE] Make heartbeat interval and election timeout of masters configurable.
Diffstat (limited to 'weed/server/raft_server.go')
| -rw-r--r-- | weed/server/raft_server.go | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 568bfc7b5..91dd185c8 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -19,6 +19,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" ) +type RaftServerOption struct { + GrpcDialOption grpc.DialOption + Peers []pb.ServerAddress + ServerAddr pb.ServerAddress + DataDir string + Topo *topology.Topology + RaftResumeState bool + HeartbeatInterval time.Duration + ElectionTimeout time.Duration +} + type RaftServer struct { peers []pb.ServerAddress // initial peers to join with raftServer raft.Server @@ -52,12 +63,12 @@ func (s StateMachine) Recovery(data []byte) error { return nil } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { +func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ - peers: peers, - serverAddr: serverAddr, - dataDir: dataDir, - topo: topo, + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, } if glog.V(4) { @@ -67,10 +78,10 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) var err error - transporter := raft.NewGrpcTransporter(grpcDialOption) - glog.V(0).Infof("Starting RaftServer with %v", serverAddr) + transporter := raft.NewGrpcTransporter(option.GrpcDialOption) + glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) - if !raftResumeState { + if !option.RaftResumeState { // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "log")) @@ -80,14 +91,15 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser return nil, err } - stateMachine := StateMachine{topo: topo} - s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "") + stateMachine := StateMachine{topo: option.Topo} + s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "") if err != nil { glog.V(0).Infoln(err) return nil, err } - s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond) - s.raftServer.SetElectionTimeout(10 * time.Second) + heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + s.raftServer.SetHeartbeatInterval(heartbeatInterval) + s.raftServer.SetElectionTimeout(option.ElectionTimeout) if err := s.raftServer.LoadSnapshot(); err != nil { return nil, err } @@ -123,7 +135,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser s.GrpcServer = raft.NewGrpcServer(s.raftServer) - if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { + if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) { // Initialize the server by joining itself. // s.DoJoinCommand() } |
