diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-04 13:50:56 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-04 13:50:56 +0500 |
| commit | c514710b7b0454c7a070ebf30aa865c9a5ad1591 (patch) | |
| tree | 00df9926c66655ba554310aeb48ef4e0a313da3c /weed/server/raft_server.go | |
| parent | dfe5bfbe2be0eed360ffcbdd1f25fa5a16a5eb55 (diff) | |
| download | seaweedfs-c514710b7b0454c7a070ebf30aa865c9a5ad1591.tar.xz seaweedfs-c514710b7b0454c7a070ebf30aa865c9a5ad1591.zip | |
initial add hashicorp raft
Diffstat (limited to 'weed/server/raft_server.go')
| -rw-r--r-- | weed/server/raft_server.go | 49 |
1 files changed, 44 insertions, 5 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index d559cb691..5ed3724e2 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,6 +2,9 @@ package weed_server import ( "encoding/json" + transport "github.com/Jille/raft-grpc-transport" + "io" + "io/ioutil" "math/rand" "os" "path" @@ -12,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" @@ -26,14 +30,17 @@ type RaftServerOption struct { RaftResumeState bool HeartbeatInterval time.Duration ElectionTimeout time.Duration + RaftBootstrap bool } type RaftServer struct { - peers map[string]pb.ServerAddress // initial peers to join with - raftServer raft.Server - dataDir string - serverAddr pb.ServerAddress - topo *topology.Topology + peers map[string]pb.ServerAddress // initial peers to join with + raftServer raft.Server + raftHashicorp hashicorpRaft.Raft + TransportManager *transport.Manager + dataDir string + serverAddr pb.ServerAddress + topo *topology.Topology *raft.GrpcServer } @@ -42,6 +49,8 @@ type StateMachine struct { topo *topology.Topology } +var _ hashicorpRaft.FSM = &StateMachine{} + func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), @@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error { return nil } +func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { + before := s.topo.GetMaxVolumeId() + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(l.Data, &state) + if err != nil { + return err + } + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + + glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) + return nil +} + +func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { + return &topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + }, nil +} + +func (s *StateMachine) Restore(r io.ReadCloser) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + if err := s.Recovery(b); err != nil { + return err + } + return nil +} + func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, |
