aboutsummaryrefslogtreecommitdiff
path: root/weed/server/raft_server.go
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-04 13:50:56 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-04 13:50:56 +0500
commitc514710b7b0454c7a070ebf30aa865c9a5ad1591 (patch)
tree00df9926c66655ba554310aeb48ef4e0a313da3c /weed/server/raft_server.go
parentdfe5bfbe2be0eed360ffcbdd1f25fa5a16a5eb55 (diff)
downloadseaweedfs-c514710b7b0454c7a070ebf30aa865c9a5ad1591.tar.xz
seaweedfs-c514710b7b0454c7a070ebf30aa865c9a5ad1591.zip
initial add hashicorp raft
Diffstat (limited to 'weed/server/raft_server.go')
-rw-r--r--weed/server/raft_server.go49
1 files changed, 44 insertions, 5 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index d559cb691..5ed3724e2 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,6 +2,9 @@ package weed_server
import (
"encoding/json"
+ transport "github.com/Jille/raft-grpc-transport"
+ "io"
+ "io/ioutil"
"math/rand"
"os"
"path"
@@ -12,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
@@ -26,14 +30,17 @@ type RaftServerOption struct {
RaftResumeState bool
HeartbeatInterval time.Duration
ElectionTimeout time.Duration
+ RaftBootstrap bool
}
type RaftServer struct {
- peers map[string]pb.ServerAddress // initial peers to join with
- raftServer raft.Server
- dataDir string
- serverAddr pb.ServerAddress
- topo *topology.Topology
+ peers map[string]pb.ServerAddress // initial peers to join with
+ raftServer raft.Server
+ raftHashicorp hashicorpRaft.Raft
+ TransportManager *transport.Manager
+ dataDir string
+ serverAddr pb.ServerAddress
+ topo *topology.Topology
*raft.GrpcServer
}
@@ -42,6 +49,8 @@ type StateMachine struct {
topo *topology.Topology
}
+var _ hashicorpRaft.FSM = &StateMachine{}
+
func (s StateMachine) Save() ([]byte, error) {
state := topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
@@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
+func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
+ before := s.topo.GetMaxVolumeId()
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(l.Data, &state)
+ if err != nil {
+ return err
+ }
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+
+ glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
+ return nil
+}
+
+func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
+ return &topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }, nil
+}
+
+func (s *StateMachine) Restore(r io.ReadCloser) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ if err := s.Recovery(b); err != nil {
+ return err
+ }
+ return nil
+}
+
func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,