aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docker/Makefile3
-rw-r--r--docker/compose/local-hashicorp-raft-compose.yml89
-rw-r--r--go.mod11
-rw-r--r--weed/command/master.go45
-rw-r--r--weed/server/master_server.go42
-rw-r--r--weed/server/raft_hashicorp.go146
-rw-r--r--weed/server/raft_server.go64
-rw-r--r--weed/server/raft_server_handlers.go8
-rw-r--r--weed/topology/cluster_commands.go20
-rw-r--r--weed/topology/topology.go25
10 files changed, 419 insertions, 34 deletions
diff --git a/docker/Makefile b/docker/Makefile
index 446bb5b47..76cdf75c8 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -55,6 +55,9 @@ cluster: build
2clusters: build
docker-compose -f compose/local-clusters-compose.yml -p seaweedfs up
+hashicorp_raft: build
+ docker-compose -f compose/local-hashicorp-raft-compose.yml -p seaweedfs up
+
s3tests: build s3tests_build
docker-compose -f compose/local-s3tests-compose.yml -p seaweedfs up
diff --git a/docker/compose/local-hashicorp-raft-compose.yml b/docker/compose/local-hashicorp-raft-compose.yml
new file mode 100644
index 000000000..69e80a446
--- /dev/null
+++ b/docker/compose/local-hashicorp-raft-compose.yml
@@ -0,0 +1,89 @@
+version: '2'
+
+services:
+ master0:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 9333:9333
+ - 19333:19333
+ command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -ip=master0 -port=9333 -peers=master1:9334,master2:9335 -mdir=/data"
+ volumes:
+ - ./master/0:/data
+ environment:
+ WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
+ WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
+ WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1
+ master1:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 9334:9334
+ - 19334:19334
+ command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -ip=master1 -port=9334 -peers=master0:9333,master2:9335 -mdir=/data"
+ volumes:
+ - ./master/1:/data
+ environment:
+ WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
+ WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
+ WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1
+ master2:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 9335:9335
+ - 19335:19335
+ command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -ip=master2 -port=9335 -peers=master0:9333,master1:9334 -mdir=/data"
+ volumes:
+ - ./master/2:/data
+ environment:
+ WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
+ WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
+ WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1
+ volume1:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8080:8080
+ - 18080:18080
+ command: 'volume -dataCenter=dc1 -rack=v1 -mserver="master0:9333,master1:9334,master2:9335" -port=8080 -ip=volume1 -publicUrl=localhost:8080 -preStopSeconds=1'
+ depends_on:
+ - master0
+ - master1
+ volume2:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8082:8082
+ - 18082:18082
+ command: 'volume -dataCenter=dc2 -rack=v2 -mserver="master0:9333,master1:9334,master2:9335" -port=8082 -ip=volume2 -publicUrl=localhost:8082 -preStopSeconds=1'
+ depends_on:
+ - master0
+ - master1
+ volume3:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8083:8083
+ - 18083:18083
+ command: 'volume -dataCenter=dc3 -rack=v3 -mserver="master0:9333,master1:9334,master2:9335" -port=8083 -ip=volume3 -publicUrl=localhost:8083 -preStopSeconds=1'
+ depends_on:
+ - master0
+ - master1
+ filer:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8888:8888
+ - 18888:18888
+ - 8111:8111
+ command: 'filer -defaultReplicaPlacement=100 -iam -master="master0:9333,master1:9334,master2:9335"'
+ depends_on:
+ - master0
+ - master1
+ - volume1
+ - volume2
+ s3:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8333:8333
+ command: '-v=9 s3 -ip.bind="s3" -filer="filer:8888"'
+ depends_on:
+ - master0
+ - master1
+ - volume1
+ - volume2
+ - filer \ No newline at end of file
diff --git a/go.mod b/go.mod
index 64b82671f..50e5ae44c 100644
--- a/go.mod
+++ b/go.mod
@@ -155,6 +155,8 @@ require (
require (
cloud.google.com/go/compute v1.5.0 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
+ github.com/Jille/raft-grpc-transport v1.2.0 // indirect
+ github.com/armon/go-metrics v0.3.10 // indirect
github.com/aws/aws-sdk-go-v2 v1.16.2 // indirect
github.com/aws/aws-sdk-go-v2/config v1.15.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.11.2 // indirect
@@ -168,16 +170,25 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 // indirect
github.com/aws/smithy-go v1.11.2 // indirect
+ github.com/boltdb/bolt v1.3.1 // indirect
github.com/d4l3k/messagediff v1.2.1 // indirect
+ github.com/fatih/color v1.13.0 // indirect
github.com/fclairamb/go-log v0.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 // indirect
+ github.com/hashicorp/go-hclog v1.0.0 // indirect
+ github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
+ github.com/hashicorp/go-msgpack v1.1.5 // indirect
+ github.com/hashicorp/golang-lru v0.5.4 // indirect
+ github.com/hashicorp/raft v1.3.6 // indirect
+ github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.6 // indirect
+ github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d // indirect
diff --git a/weed/command/master.go b/weed/command/master.go
index 3d69f4216..37cfaf252 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -48,6 +48,8 @@ type MasterOptions struct {
metricsHttpPort *int
heartbeatInterval *time.Duration
electionTimeout *time.Duration
+ raftHashicorp *bool
+ raftBootstrap *bool
}
func init() {
@@ -71,6 +73,8 @@ func init() {
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
+ m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft")
+ m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
}
var cmdMaster = &Command{
@@ -156,13 +160,25 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
RaftResumeState: *masterOption.raftResumeState,
HeartbeatInterval: *masterOption.heartbeatInterval,
ElectionTimeout: *masterOption.electionTimeout,
+ RaftBootstrap: *m.raftBootstrap,
}
- raftServer, err := weed_server.NewRaftServer(raftServerOption)
- if raftServer == nil {
- glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ var raftServer *weed_server.RaftServer
+ var err error
+ if *m.raftHashicorp {
+ if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil {
+ glog.Fatalf("NewHashicorpRaftServer: %s", err)
+ }
+ } else {
+ raftServer, err = weed_server.NewRaftServer(raftServerOption)
+ if raftServer == nil {
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ }
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
+ if *m.raftHashicorp {
+ r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
+ }
// starting grpc server
grpcPort := *masterOption.portGrpc
grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0)
@@ -171,7 +187,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
- protobuf.RegisterRaftServer(grpcS, raftServer)
+ if *m.raftHashicorp {
+ raftServer.TransportManager.Register(grpcS)
+ } else {
+ protobuf.RegisterRaftServer(grpcS, raftServer)
+ }
reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
if grpcLocalL != nil {
@@ -179,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
go grpcS.Serve(grpcL)
- go func() {
- time.Sleep(1500 * time.Millisecond)
- if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
- if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
- raftServer.DoJoinCommand()
+ timeSleep := 1500 * time.Millisecond
+ if !*m.raftHashicorp {
+ go func() {
+ time.Sleep(timeSleep)
+ if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
+ if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
+ raftServer.DoJoinCommand()
+ }
}
- }
- }()
+ }()
+ }
go ms.MasterClient.KeepConnectedToMaster()
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index b63e3a418..37e7f245c 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -160,19 +160,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.Topo.RaftServer = raftServer.raftServer
- ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
- stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
- if ms.Topo.RaftServer.Leader() != "" {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
- }
- })
+ var raftServerName string
+ if raftServer.raftServer != nil {
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ }
+ })
+ raftServerName = ms.Topo.RaftServer.Name()
+ } else if raftServer.RaftHashicorp != nil {
+ ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
+ leaderCh := raftServer.RaftHashicorp.LeaderCh()
+ prevLeader := ms.Topo.HashicorpRaft.Leader()
+ go func() {
+ for {
+ select {
+ case isLeader := <-leaderCh:
+ leader := ms.Topo.HashicorpRaft.Leader()
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ prevLeader = leader
+ }
+ }
+ }()
+ raftServerName = ms.Topo.HashicorpRaft.String()
+ }
if ms.Topo.IsLeader() {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
+ glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
- if ms.Topo.RaftServer.Leader() != "" {
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
}
}
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
new file mode 100644
index 000000000..e19839922
--- /dev/null
+++ b/weed/server/raft_hashicorp.go
@@ -0,0 +1,146 @@
+package weed_server
+
+// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28
+// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18
+
+import (
+ "fmt"
+ transport "github.com/Jille/raft-grpc-transport"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/hashicorp/raft"
+ boltdb "github.com/hashicorp/raft-boltdb"
+ "google.golang.org/grpc"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "time"
+)
+
+func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
+ for _, peer := range s.peers {
+ cfg.Servers = append(cfg.Servers, raft.Server{
+ Suffrage: raft.Voter,
+ ID: raft.ServerID(peer.String()),
+ Address: raft.ServerAddress(peer.ToGrpcAddress()),
+ })
+ }
+ return cfg
+}
+
+func (s *RaftServer) UpdatePeers() {
+ for {
+ select {
+ case isLeader := <-s.RaftHashicorp.LeaderCh():
+ if isLeader {
+ peerLeader := s.serverAddr.String()
+ existsPeerName := make(map[string]bool)
+ for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerLeader {
+ continue
+ }
+ existsPeerName[string(server.ID)] = true
+ }
+ for _, peer := range s.peers {
+ if peer.String() == peerLeader || existsPeerName[peer.String()] {
+ continue
+ }
+ glog.V(0).Infof("adding new peer: %s", peer.String())
+ s.RaftHashicorp.AddVoter(
+ raft.ServerID(peer.String()), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
+ }
+ for peer, _ := range existsPeerName {
+ if _, found := s.peers[peer]; !found {
+ glog.V(0).Infof("removing old peer: %s", peer)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
+ }
+ }
+ if _, found := s.peers[peerLeader]; !found {
+ glog.V(0).Infof("removing old leader peer: %s", peerLeader)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+ }
+ }
+ break
+ }
+ }
+}
+
+func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
+ s := &RaftServer{
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
+ }
+
+ c := raft.DefaultConfig()
+ c.LocalID = raft.ServerID(s.serverAddr.String()) // TODO maybee the IP:port address will change
+ c.NoSnapshotRestoreOnStart = option.RaftResumeState
+ c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ c.ElectionTimeout = option.ElectionTimeout
+ if c.LeaderLeaseTimeout > c.HeartbeatTimeout {
+ c.LeaderLeaseTimeout = c.HeartbeatTimeout
+ }
+ if glog.V(4) {
+ c.LogLevel = "Debug"
+ } else if glog.V(2) {
+ c.LogLevel = "Info"
+ } else if glog.V(1) {
+ c.LogLevel = "Warn"
+ } else if glog.V(0) {
+ c.LogLevel = "Error"
+ }
+
+ baseDir := s.dataDir
+
+ ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat"))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
+ }
+
+ sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat"))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
+ }
+
+ fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
+ if err != nil {
+ return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
+ }
+
+ s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption})
+
+ stateMachine := StateMachine{topo: option.Topo}
+ s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
+ if err != nil {
+ return nil, fmt.Errorf("raft.NewRaft: %v", err)
+ }
+ if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
+ cfg := s.AddPeersConfiguration()
+ glog.V(0).Infoln("Bootstrapping new cluster %+v", cfg)
+ f := s.RaftHashicorp.BootstrapCluster(cfg)
+ if err := f.Error(); err != nil {
+ return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
+ }
+ } else {
+ go s.UpdatePeers()
+ }
+
+ ticker := time.NewTicker(c.HeartbeatTimeout * 10)
+ if glog.V(4) {
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ cfuture := s.RaftHashicorp.GetConfiguration()
+ if err = cfuture.Error(); err != nil {
+ glog.Fatalf("error getting config: %s", err)
+ }
+ configuration := cfuture.Configuration()
+ glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers)
+ }
+ }
+ }()
+ }
+
+ return s, nil
+}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index d559cb691..8c372f0cc 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,6 +2,9 @@ package weed_server
import (
"encoding/json"
+ transport "github.com/Jille/raft-grpc-transport"
+ "io"
+ "io/ioutil"
"math/rand"
"os"
"path"
@@ -12,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
@@ -26,14 +30,17 @@ type RaftServerOption struct {
RaftResumeState bool
HeartbeatInterval time.Duration
ElectionTimeout time.Duration
+ RaftBootstrap bool
}
type RaftServer struct {
- peers map[string]pb.ServerAddress // initial peers to join with
- raftServer raft.Server
- dataDir string
- serverAddr pb.ServerAddress
- topo *topology.Topology
+ peers map[string]pb.ServerAddress // initial peers to join with
+ raftServer raft.Server
+ RaftHashicorp *hashicorpRaft.Raft
+ TransportManager *transport.Manager
+ dataDir string
+ serverAddr pb.ServerAddress
+ topo *topology.Topology
*raft.GrpcServer
}
@@ -42,6 +49,8 @@ type StateMachine struct {
topo *topology.Topology
}
+var _ hashicorpRaft.FSM = &StateMachine{}
+
func (s StateMachine) Save() ([]byte, error) {
state := topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
@@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
+func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
+ before := s.topo.GetMaxVolumeId()
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(l.Data, &state)
+ if err != nil {
+ return err
+ }
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+
+ glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
+ return nil
+}
+
+func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
+ return &topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }, nil
+}
+
+func (s *StateMachine) Restore(r io.ReadCloser) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ if err := s.Recovery(b); err != nil {
+ return err
+ }
+ return nil
+}
+
func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
@@ -132,12 +171,17 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
}
func (s *RaftServer) Peers() (members []string) {
- peers := s.raftServer.Peers()
-
- for _, p := range peers {
- members = append(members, p.Name)
+ if s.raftServer != nil {
+ peers := s.raftServer.Peers()
+ for _, p := range peers {
+ members = append(members, p.Name)
+ }
+ } else if s.RaftHashicorp != nil {
+ cfg := s.RaftHashicorp.GetConfiguration()
+ for _, p := range cfg.Configuration().Servers {
+ members = append(members, string(p.ID))
+ }
}
-
return
}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index 7e58f1e92..cc3e6e37f 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -25,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
}
writeJsonQuiet(w, r, http.StatusOK, ret)
}
+
+func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) {
+ if s.RaftHashicorp == nil {
+ writeJsonQuiet(w, r, http.StatusNotFound, nil)
+ return
+ }
+ writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats())
+}
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
index 152691ccb..1bcc6b449 100644
--- a/weed/topology/cluster_commands.go
+++ b/weed/topology/cluster_commands.go
@@ -1,9 +1,12 @@
package topology
import (
+ "encoding/json"
+ "fmt"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ hashicorpRaft "github.com/hashicorp/raft"
)
type MaxVolumeIdCommand struct {
@@ -20,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string {
return "MaxVolumeId"
}
+// deprecatedCommandApply represents the old interface to apply a command to the server.
func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
topo := server.Context().(*Topology)
before := topo.GetMaxVolumeId()
@@ -29,3 +33,19 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
return nil, nil
}
+
+func (s *MaxVolumeIdCommand) Persist(sink hashicorpRaft.SnapshotSink) error {
+ b, err := json.Marshal(s)
+ if err != nil {
+ return fmt.Errorf("marshal: %v", err)
+ }
+ _, err = sink.Write(b)
+ if err != nil {
+ sink.Cancel()
+ return fmt.Errorf("sink.Write(): %v", err)
+ }
+ return sink.Close()
+}
+
+func (s *MaxVolumeIdCommand) Release() {
+}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 207c89ad7..d636e8a9e 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -1,6 +1,7 @@
package topology
import (
+ "encoding/json"
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -10,6 +11,7 @@ import (
"time"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -40,7 +42,8 @@ type Topology struct {
Configuration *Configuration
- RaftServer raft.Server
+ RaftServer raft.Server
+ HashicorpRaft *hashicorpRaft.Raft
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -76,6 +79,10 @@ func (t *Topology) IsLeader() bool {
return true
}
}
+ } else if t.HashicorpRaft != nil {
+ if t.HashicorpRaft.State() == hashicorpRaft.Leader {
+ return true
+ }
}
return false
}
@@ -85,6 +92,8 @@ func (t *Topology) Leader() (pb.ServerAddress, error) {
for count := 0; count < 3; count++ {
if t.RaftServer != nil {
l = pb.ServerAddress(t.RaftServer.Leader())
+ } else if t.HashicorpRaft != nil {
+ l = pb.ServerAddress(t.HashicorpRaft.Leader())
} else {
return "", errors.New("Raft Server not ready yet!")
}
@@ -124,8 +133,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
- if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
- return 0, err
+ if t.RaftServer != nil {
+ if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
+ return 0, err
+ }
+ } else if t.HashicorpRaft != nil {
+ b, err := json.Marshal(NewMaxVolumeIdCommand(next))
+ if err != nil {
+ return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
+ }
+ if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil {
+ return 0, future.Error()
+ }
}
return next, nil
}