diff options
Diffstat (limited to 'weed/server/raft_server.go')
| -rw-r--r-- | weed/server/raft_server.go | 99 |
1 files changed, 59 insertions, 40 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index f22b7c45d..8c372f0cc 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,11 +2,12 @@ package weed_server import ( "encoding/json" + transport "github.com/Jille/raft-grpc-transport" + "io" + "io/ioutil" "math/rand" "os" "path" - "sort" - "strings" "time" "google.golang.org/grpc" @@ -14,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" @@ -21,21 +23,24 @@ import ( type RaftServerOption struct { GrpcDialOption grpc.DialOption - Peers []pb.ServerAddress + Peers map[string]pb.ServerAddress ServerAddr pb.ServerAddress DataDir string Topo *topology.Topology RaftResumeState bool HeartbeatInterval time.Duration ElectionTimeout time.Duration + RaftBootstrap bool } type RaftServer struct { - peers []pb.ServerAddress // initial peers to join with - raftServer raft.Server - dataDir string - serverAddr pb.ServerAddress - topo *topology.Topology + peers map[string]pb.ServerAddress // initial peers to join with + raftServer raft.Server + RaftHashicorp *hashicorpRaft.Raft + TransportManager *transport.Manager + dataDir string + serverAddr pb.ServerAddress + topo *topology.Topology *raft.GrpcServer } @@ -44,6 +49,8 @@ type StateMachine struct { topo *topology.Topology } +var _ hashicorpRaft.FSM = &StateMachine{} + func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), @@ -63,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error { return nil } +func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { + before := s.topo.GetMaxVolumeId() + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(l.Data, &state) + if err != nil { + return err + } + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + + glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) + return nil +} + +func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { + return &topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + }, nil +} + +func (s *StateMachine) Restore(r io.ReadCloser) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + if err := s.Recovery(b); err != nil { + return err + } + return nil +} + func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -88,7 +125,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } - if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil { + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil { return nil, err } @@ -108,23 +145,15 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { return nil, err } - for _, peer := range s.peers { - if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil { + for name, peer := range s.peers { + if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil { return nil, err } } // Remove deleted peers for existsPeerName := range s.raftServer.Peers() { - exists := false - var existingPeer pb.ServerAddress - for _, peer := range s.peers { - if peer.String() == existsPeerName { - exists, existingPeer = true, peer - break - } - } - if !exists { + if existingPeer, found := s.peers[existsPeerName]; !found { if err := s.raftServer.RemovePeer(existsPeerName); err != nil { glog.V(0).Infoln(err) return nil, err @@ -136,36 +165,26 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s.GrpcServer = raft.NewGrpcServer(s.raftServer) - if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) { - // Initialize the server by joining itself. - // s.DoJoinCommand() - } - glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) return s, nil } func (s *RaftServer) Peers() (members []string) { - peers := s.raftServer.Peers() - - for _, p := range peers { - members = append(members, p.Name) + if s.raftServer != nil { + peers := s.raftServer.Peers() + for _, p := range peers { + members = append(members, p.Name) + } + } else if s.RaftHashicorp != nil { + cfg := s.RaftHashicorp.GetConfiguration() + for _, p := range cfg.Configuration().Servers { + members = append(members, string(p.ID)) + } } - return } -func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { - sort.Slice(peers, func(i, j int) bool { - return strings.Compare(string(peers[i]), string(peers[j])) < 0 - }) - if len(peers) <= 0 { - return true - } - return self == peers[0] -} - func (s *RaftServer) DoJoinCommand() { glog.V(0).Infoln("Initializing new cluster") |
