diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-04 13:50:56 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-04 13:50:56 +0500 |
| commit | c514710b7b0454c7a070ebf30aa865c9a5ad1591 (patch) | |
| tree | 00df9926c66655ba554310aeb48ef4e0a313da3c /weed | |
| parent | dfe5bfbe2be0eed360ffcbdd1f25fa5a16a5eb55 (diff) | |
| download | seaweedfs-c514710b7b0454c7a070ebf30aa865c9a5ad1591.tar.xz seaweedfs-c514710b7b0454c7a070ebf30aa865c9a5ad1591.zip | |
initial add hashicorp raft
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/master.go | 24 | ||||
| -rw-r--r-- | weed/server/raft_hashicorp.go | 98 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 49 | ||||
| -rw-r--r-- | weed/topology/cluster_commands.go | 19 |
4 files changed, 183 insertions, 7 deletions
diff --git a/weed/command/master.go b/weed/command/master.go index e56ee19fe..459d3e1cb 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,6 +1,7 @@ package command import ( + "context" "net/http" "os" "sort" @@ -48,6 +49,8 @@ type MasterOptions struct { metricsHttpPort *int heartbeatInterval *time.Duration electionTimeout *time.Duration + raftHashicorp *bool + raftBootstrap *bool } func init() { @@ -71,6 +74,8 @@ func init() { m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") + m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft") + m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") } var cmdMaster = &Command{ @@ -156,13 +161,24 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { RaftResumeState: *masterOption.raftResumeState, HeartbeatInterval: *masterOption.heartbeatInterval, ElectionTimeout: *masterOption.electionTimeout, + RaftBootstrap: *m.raftBootstrap, + } + var raftServer *weed_server.RaftServer + var err error + if *m.raftHashicorp { + ctx := context.Background() + raftServer, err = weed_server.NewHashicorpRaftServer(ctx, raftServerOption) + } else { + raftServer, err = weed_server.NewRaftServer(raftServerOption) } - raftServer, err := weed_server.NewRaftServer(raftServerOption) if raftServer == nil { glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + if *m.raftHashicorp { + //r.HandleFunc("/raft/stats", raftServer.).Methods("GET") + } // starting grpc server grpcPort := *masterOption.portGrpc grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0) @@ -171,7 +187,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) + if *m.raftHashicorp { + raftServer.TransportManager.Register(grpcS) + } else { + protobuf.RegisterRaftServer(grpcS, raftServer) + } reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort) if grpcLocalL != nil { diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go new file mode 100644 index 000000000..caef42f62 --- /dev/null +++ b/weed/server/raft_hashicorp.go @@ -0,0 +1,98 @@ +package weed_server + +// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28 +// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 + +import ( + "context" + "fmt" + transport "github.com/Jille/raft-grpc-transport" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/hashicorp/raft" + boltdb "github.com/hashicorp/raft-boltdb" + "google.golang.org/grpc" + "math/rand" + "os" + "path/filepath" + "time" +) + +func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*RaftServer, error) { + s := &RaftServer{ + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, + } + + c := raft.DefaultConfig() + c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change + c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + c.ElectionTimeout = option.ElectionTimeout + if glog.V(4) { + c.Logger.SetLevel(1) + } else if glog.V(3) { + c.Logger.SetLevel(2) + } else if glog.V(2) { + c.Logger.SetLevel(3) + } else if glog.V(1) { + c.Logger.SetLevel(4) + } else if glog.V(0) { + c.Logger.SetLevel(5) + } + + baseDir := s.dataDir + + ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat")) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) + } + + sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat")) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err) + } + + fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr) + if err != nil { + return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) + } + + // s.GrpcServer = raft.NewGrpcServer(s.raftServer) + s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption}) + + stateMachine := StateMachine{topo: option.Topo} + r, err := raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport()) + if err != nil { + return nil, fmt.Errorf("raft.NewRaft: %v", err) + } + + if option.RaftBootstrap { + cfg := raft.Configuration{ + Servers: []raft.Server{ + { + Suffrage: raft.Voter, + ID: c.LocalID, + Address: raft.ServerAddress(s.serverAddr), + }, + }, + } + // Add known peers to bootstrap + for _, node := range option.Peers { + if node == option.ServerAddr { + continue + } + cfg.Servers = append(cfg.Servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(node), + Address: raft.ServerAddress(node), + }) + } + f := r.BootstrapCluster(cfg) + if err := f.Error(); err != nil { + return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) + } + } + + return s, nil +} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index d559cb691..5ed3724e2 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,6 +2,9 @@ package weed_server import ( "encoding/json" + transport "github.com/Jille/raft-grpc-transport" + "io" + "io/ioutil" "math/rand" "os" "path" @@ -12,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" @@ -26,14 +30,17 @@ type RaftServerOption struct { RaftResumeState bool HeartbeatInterval time.Duration ElectionTimeout time.Duration + RaftBootstrap bool } type RaftServer struct { - peers map[string]pb.ServerAddress // initial peers to join with - raftServer raft.Server - dataDir string - serverAddr pb.ServerAddress - topo *topology.Topology + peers map[string]pb.ServerAddress // initial peers to join with + raftServer raft.Server + raftHashicorp hashicorpRaft.Raft + TransportManager *transport.Manager + dataDir string + serverAddr pb.ServerAddress + topo *topology.Topology *raft.GrpcServer } @@ -42,6 +49,8 @@ type StateMachine struct { topo *topology.Topology } +var _ hashicorpRaft.FSM = &StateMachine{} + func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), @@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error { return nil } +func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { + before := s.topo.GetMaxVolumeId() + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(l.Data, &state) + if err != nil { + return err + } + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + + glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) + return nil +} + +func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { + return &topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + }, nil +} + +func (s *StateMachine) Restore(r io.ReadCloser) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + if err := s.Recovery(b); err != nil { + return err + } + return nil +} + func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 152691ccb..951de0711 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -1,9 +1,12 @@ package topology import ( + "encoding/json" + "fmt" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" + hashicorpRaft "github.com/hashicorp/raft" ) type MaxVolumeIdCommand struct { @@ -29,3 +32,19 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { return nil, nil } + +func (s *MaxVolumeIdCommand) Persist(sink hashicorpRaft.SnapshotSink) error { + b, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("marshal: %v", err) + } + _, err = sink.Write(b) + if err != nil { + sink.Cancel() + return fmt.Errorf("sink.Write(): %v", err) + } + return sink.Close() +} + +func (s *MaxVolumeIdCommand) Release() { +} |
