aboutsummaryrefslogtreecommitdiff
path: root/weed/server/raft_server.go
diff options
context:
space:
mode:
authorУстюжанин Антон Александрович <ustuzhanin@tochka.com>2020-10-02 23:01:20 +0500
committerУстюжанин Антон Александрович <ustuzhanin@tochka.com>2020-10-02 23:01:20 +0500
commit8c82fb7e5f0604953d0f8430f3752c0b2bfcada8 (patch)
tree3988c913f77ace077f3db64ab90d10df2ea333c6 /weed/server/raft_server.go
parent3e0a79ef050dba9e5347d20537ef562cc4b30b62 (diff)
downloadseaweedfs-8c82fb7e5f0604953d0f8430f3752c0b2bfcada8.tar.xz
seaweedfs-8c82fb7e5f0604953d0f8430f3752c0b2bfcada8.zip
fix: restore raft state
Diffstat (limited to 'weed/server/raft_server.go')
-rw-r--r--weed/server/raft_server.go61
1 files changed, 50 insertions, 11 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 958680d2b..0679215a1 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -28,7 +28,31 @@ type RaftServer struct {
*raft.GrpcServer
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+type StateMachine struct {
+ raft.StateMachine
+ topo *topology.Topology
+}
+
+func (s StateMachine) Save() ([]byte, error) {
+ state := topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }
+ glog.V(1).Infof("Save raft state %+v", state)
+ return json.Marshal(state)
+}
+
+func (s StateMachine) Recovery(data []byte) error {
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(data, &state)
+ if err != nil {
+ return err
+ }
+ glog.V(1).Infof("Recovery raft state %+v", state)
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+ return nil
+}
+
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int, cleanState bool) (*RaftServer, error) {
s := &RaftServer{
peers: peers,
serverAddr: serverAddr,
@@ -46,26 +70,41 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
transporter := raft.NewGrpcTransporter(grpcDialOption)
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
- // always clear previous metadata
- os.RemoveAll(path.Join(s.dataDir, "conf"))
- os.RemoveAll(path.Join(s.dataDir, "log"))
- os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ if cleanState {
+ // always clear previous metadata
+ os.RemoveAll(path.Join(s.dataDir, "conf"))
+ os.RemoveAll(path.Join(s.dataDir, "log"))
+ os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ }
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ return nil, err
+ }
+
// Clear old cluster configurations if peers are changed
if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
}
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
+ stateMachine := StateMachine{topo: topo}
+ s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
glog.V(0).Infoln(err)
- return nil
+ return nil, err
}
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
- s.raftServer.Start()
+ if err := s.raftServer.LoadSnapshot(); err != nil {
+ return nil, err
+ }
+ if err := s.raftServer.Start(); err != nil {
+ return nil, err
+ }
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
+ if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ return nil, err
+ }
+
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
@@ -81,13 +120,13 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
if err != nil {
glog.V(0).Infoln(err)
- return nil
+ return nil, err
}
}
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
- return s
+ return s, nil
}
func (s *RaftServer) Peers() (members []string) {