aboutsummaryrefslogtreecommitdiff
path: root/weed/server/raft_server.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/server/raft_server.go
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/server/raft_server.go')
-rw-r--r--weed/server/raft_server.go138
1 files changed, 84 insertions, 54 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 53289f1c1..85841e409 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,16 +2,18 @@ package weed_server
import (
"encoding/json"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "io/ioutil"
+ "math/rand"
"os"
"path"
- "reflect"
"sort"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -25,7 +27,31 @@ type RaftServer struct {
*raft.GrpcServer
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+type StateMachine struct {
+ raft.StateMachine
+ topo *topology.Topology
+}
+
+func (s StateMachine) Save() ([]byte, error) {
+ state := topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }
+ glog.V(1).Infof("Save raft state %+v", state)
+ return json.Marshal(state)
+}
+
+func (s StateMachine) Recovery(data []byte) error {
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(data, &state)
+ if err != nil {
+ return err
+ }
+ glog.V(1).Infof("Recovery raft state %+v", state)
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+ return nil
+}
+
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
s := &RaftServer{
peers: peers,
serverAddr: serverAddr,
@@ -43,47 +69,66 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
transporter := raft.NewGrpcTransporter(grpcDialOption)
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
- // Clear old cluster configurations if peers are changed
- if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
- glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
+ if !raftResumeState {
+ // always clear previous metadata
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ return nil, err
+ }
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
+ stateMachine := StateMachine{topo: topo}
+ s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
glog.V(0).Infoln(err)
- return nil
+ return nil, err
+ }
+ s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
+ s.raftServer.SetElectionTimeout(10 * time.Second)
+ if err := s.raftServer.LoadSnapshot(); err != nil {
+ return nil, err
+ }
+ if err := s.raftServer.Start(); err != nil {
+ return nil, err
}
- s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
- s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
- s.raftServer.Start()
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer))
+ if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ return nil, err
+ }
+ }
+
+ // Remove deleted peers
+ for existsPeerName := range s.raftServer.Peers() {
+ exists, existingPeer := false, ""
+ for _, peer := range s.peers {
+ if pb.ServerToGrpcAddress(peer) == existsPeerName {
+ exists, existingPeer = true, peer
+ break
+ }
+ }
+ if exists {
+ if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
+ glog.V(0).Infoln(err)
+ return nil, err
+ } else {
+ glog.V(0).Infof("removing old peer %s", existingPeer)
+ }
+ }
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
- glog.V(0).Infoln("Initializing new cluster")
-
- _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
- Name: s.raftServer.Name(),
- ConnectionString: util.ServerToGrpcAddress(s.serverAddr),
- })
-
- if err != nil {
- glog.V(0).Infoln(err)
- return nil
- }
+ // s.DoJoinCommand()
}
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
- return s
+ return s, nil
}
func (s *RaftServer) Peers() (members []string) {
@@ -96,34 +141,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) {
- confPath := path.Join(dir, "conf")
- // open conf file
- b, err := ioutil.ReadFile(confPath)
- if err != nil {
- return oldPeers, true
- }
- conf := &raft.Config{}
- if err = json.Unmarshal(b, conf); err != nil {
- return oldPeers, true
- }
-
- for _, p := range conf.Peers {
- oldPeers = append(oldPeers, p.Name)
- }
- oldPeers = append(oldPeers, self)
-
- if len(peers) == 0 && len(oldPeers) <= 1 {
- return oldPeers, false
- }
-
- sort.Strings(peers)
- sort.Strings(oldPeers)
-
- return oldPeers, !reflect.DeepEqual(peers, oldPeers)
-
-}
-
func isTheFirstOne(self string, peers []string) bool {
sort.Strings(peers)
if len(peers) <= 0 {
@@ -131,3 +148,16 @@ func isTheFirstOne(self string, peers []string) bool {
}
return self == peers[0]
}
+
+func (s *RaftServer) DoJoinCommand() {
+
+ glog.V(0).Infoln("Initializing new cluster")
+
+ if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
+ Name: s.raftServer.Name(),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ }); err != nil {
+ glog.Errorf("fail to send join command: %v", err)
+ }
+
+}