aboutsummaryrefslogtreecommitdiff
path: root/weed/server/raft_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/raft_server.go')
-rw-r--r--weed/server/raft_server.go136
1 files changed, 86 insertions, 50 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 85841e409..ad0a1c8ce 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,10 +2,12 @@ package weed_server
import (
"encoding/json"
+ transport "github.com/Jille/raft-grpc-transport"
+ "io"
+ "io/ioutil"
"math/rand"
"os"
"path"
- "sort"
"time"
"google.golang.org/grpc"
@@ -13,17 +15,32 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
)
+type RaftServerOption struct {
+ GrpcDialOption grpc.DialOption
+ Peers map[string]pb.ServerAddress
+ ServerAddr pb.ServerAddress
+ DataDir string
+ Topo *topology.Topology
+ RaftResumeState bool
+ HeartbeatInterval time.Duration
+ ElectionTimeout time.Duration
+ RaftBootstrap bool
+}
+
type RaftServer struct {
- peers []string // initial peers to join with
- raftServer raft.Server
- dataDir string
- serverAddr string
- topo *topology.Topology
+ peers map[string]pb.ServerAddress // initial peers to join with
+ raftServer raft.Server
+ RaftHashicorp *hashicorpRaft.Raft
+ TransportManager *transport.Manager
+ dataDir string
+ serverAddr pb.ServerAddress
+ topo *topology.Topology
*raft.GrpcServer
}
@@ -32,6 +49,8 @@ type StateMachine struct {
topo *topology.Topology
}
+var _ hashicorpRaft.FSM = &StateMachine{}
+
func (s StateMachine) Save() ([]byte, error) {
state := topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
@@ -51,12 +70,42 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
+func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
+ before := s.topo.GetMaxVolumeId()
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(l.Data, &state)
+ if err != nil {
+ return err
+ }
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+
+ glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
+ return nil
+}
+
+func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
+ return &topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }, nil
+}
+
+func (s *StateMachine) Restore(r io.ReadCloser) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ if err := s.Recovery(b); err != nil {
+ return err
+ }
+ return nil
+}
+
+func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
- peers: peers,
- serverAddr: serverAddr,
- dataDir: dataDir,
- topo: topo,
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
}
if glog.V(4) {
@@ -66,27 +115,29 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
var err error
- transporter := raft.NewGrpcTransporter(grpcDialOption)
- glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
+ transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
+ glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
- if !raftResumeState {
+ // always clear previous log to avoid server is promotable
+ os.RemoveAll(path.Join(s.dataDir, "log"))
+ if !option.RaftResumeState {
// always clear previous metadata
os.RemoveAll(path.Join(s.dataDir, "conf"))
- os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil {
return nil, err
}
- stateMachine := StateMachine{topo: topo}
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
+ stateMachine := StateMachine{topo: option.Topo}
+ s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil, err
}
- s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
- s.raftServer.SetElectionTimeout(10 * time.Second)
+ heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ s.raftServer.SetHeartbeatInterval(heartbeatInterval)
+ s.raftServer.SetElectionTimeout(option.ElectionTimeout)
if err := s.raftServer.LoadSnapshot(); err != nil {
return nil, err
}
@@ -94,68 +145,53 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
return nil, err
}
- for _, peer := range s.peers {
- if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ for name, peer := range s.peers {
+ if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
return nil, err
}
}
// Remove deleted peers
for existsPeerName := range s.raftServer.Peers() {
- exists, existingPeer := false, ""
- for _, peer := range s.peers {
- if pb.ServerToGrpcAddress(peer) == existsPeerName {
- exists, existingPeer = true, peer
- break
- }
- }
- if exists {
+ if existingPeer, found := s.peers[existsPeerName]; !found {
if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
glog.V(0).Infoln(err)
return nil, err
} else {
- glog.V(0).Infof("removing old peer %s", existingPeer)
+ glog.V(0).Infof("removing old peer: %s", existingPeer)
}
}
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
- if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
- // Initialize the server by joining itself.
- // s.DoJoinCommand()
- }
-
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
return s, nil
}
func (s *RaftServer) Peers() (members []string) {
- peers := s.raftServer.Peers()
-
- for _, p := range peers {
- members = append(members, p.Name)
+ if s.raftServer != nil {
+ peers := s.raftServer.Peers()
+ for _, p := range peers {
+ members = append(members, p.Name)
+ }
+ } else if s.RaftHashicorp != nil {
+ cfg := s.RaftHashicorp.GetConfiguration()
+ for _, p := range cfg.Configuration().Servers {
+ members = append(members, string(p.ID))
+ }
}
-
return
}
-func isTheFirstOne(self string, peers []string) bool {
- sort.Strings(peers)
- if len(peers) <= 0 {
- return true
- }
- return self == peers[0]
-}
-
func (s *RaftServer) DoJoinCommand() {
glog.V(0).Infoln("Initializing new cluster")
if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ ConnectionString: s.serverAddr.ToGrpcAddress(),
}); err != nil {
glog.Errorf("fail to send join command: %v", err)
}