aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server_raft.go66
-rw-r--r--weed/server/master_server.go108
-rw-r--r--weed/server/raft_hashicorp.go182
-rw-r--r--weed/server/raft_server.go64
-rw-r--r--weed/server/raft_server_handlers.go8
5 files changed, 406 insertions, 22 deletions
diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go
new file mode 100644
index 000000000..37491b3df
--- /dev/null
+++ b/weed/server/master_grpc_server_raft.go
@@ -0,0 +1,66 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/hashicorp/raft"
+)
+
+func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
+ resp := &master_pb.RaftListClusterServersResponse{}
+
+ servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
+
+ for _, server := range servers {
+ resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
+ Id: string(server.ID),
+ Address: string(server.Address),
+ Suffrage: server.Suffrage.String(),
+ })
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
+ resp := &master_pb.RaftAddServerResponse{}
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
+
+ var idxFuture raft.IndexFuture
+ if req.Voter {
+ idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
+ } else {
+ idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
+ }
+
+ if err := idxFuture.Error(); err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
+ resp := &master_pb.RaftRemoveServerResponse{}
+
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
+
+ if !req.Force {
+ ms.clientChansLock.RLock()
+ _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)]
+ ms.clientChansLock.RUnlock()
+ if ok {
+ return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id)
+ }
+ }
+
+ idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0)
+ if err := idxFuture.Error(); err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index b63e3a418..9b1ebc2d4 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
@@ -17,6 +18,7 @@ import (
"github.com/chrislusf/raft"
"github.com/gorilla/mux"
+ hashicorpRaft "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -30,8 +32,9 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ RaftServerRemovalTime = 72 * time.Minute
)
type MasterOption struct {
@@ -62,6 +65,9 @@ type MasterServer struct {
boundedLeaderChan chan int
+ onPeerUpdatDoneCn chan string
+ onPeerUpdatDoneCnExist bool
+
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
@@ -112,6 +118,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
+ ms.onPeerUpdatDoneCn = make(chan string)
+
+ ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
if nil == seq {
@@ -160,19 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.Topo.RaftServer = raftServer.raftServer
- ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
- stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
- if ms.Topo.RaftServer.Leader() != "" {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
- }
- })
+ var raftServerName string
+ if raftServer.raftServer != nil {
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ }
+ })
+ raftServerName = ms.Topo.RaftServer.Name()
+ } else if raftServer.RaftHashicorp != nil {
+ ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
+ leaderCh := raftServer.RaftHashicorp.LeaderCh()
+ prevLeader := ms.Topo.HashicorpRaft.Leader()
+ go func() {
+ for {
+ select {
+ case isLeader := <-leaderCh:
+ leader := ms.Topo.HashicorpRaft.Leader()
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ prevLeader = leader
+ }
+ }
+ }()
+ raftServerName = ms.Topo.HashicorpRaft.String()
+ }
if ms.Topo.IsLeader() {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
+ glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
- if ms.Topo.RaftServer.Leader() != "" {
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
}
}
@@ -301,3 +332,56 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
return seq
}
+
+func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
+ glog.V(2).Infof("OnPeerUpdate: %+v", update)
+ if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
+ return
+ }
+ peerAddress := pb.ServerAddress(update.Address)
+ peerName := string(peerAddress)
+ isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
+ if update.IsAdd {
+ if isLeader {
+ raftServerFound := false
+ for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerName {
+ raftServerFound = true
+ }
+ }
+ if !raftServerFound {
+ glog.V(0).Infof("adding new raft server: %s", peerName)
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerName),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ }
+ }
+ if ms.onPeerUpdatDoneCnExist {
+ ms.onPeerUpdatDoneCn <- peerName
+ }
+ } else if isLeader {
+ go func(peerName string) {
+ for {
+ select {
+ case <-time.After(RaftServerRemovalTime):
+ err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ Force: false,
+ })
+ return err
+ })
+ if err != nil {
+ glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
+ }
+ return
+ case peerDone := <-ms.onPeerUpdatDoneCn:
+ if peerName == peerDone {
+ return
+ }
+ }
+ }
+ }(peerName)
+ ms.onPeerUpdatDoneCnExist = true
+ }
+}
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
new file mode 100644
index 000000000..885ffdcc7
--- /dev/null
+++ b/weed/server/raft_hashicorp.go
@@ -0,0 +1,182 @@
+package weed_server
+
+// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28
+// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18
+
+import (
+ "fmt"
+ transport "github.com/Jille/raft-grpc-transport"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/hashicorp/raft"
+ boltdb "github.com/hashicorp/raft-boltdb"
+ "google.golang.org/grpc"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+)
+
+const (
+ ldbFile = "logs.dat"
+ sdbFile = "stable.dat"
+ updatePeersTimeout = 15 * time.Minute
+)
+
+func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int {
+ peers := make([]pb.ServerAddress, 0, len(mapPeers))
+ for _, peer := range mapPeers {
+ peers = append(peers, peer)
+ }
+ sort.Slice(peers, func(i, j int) bool {
+ return strings.Compare(string(peers[i]), string(peers[j])) < 0
+ })
+ for i, peer := range peers {
+ if string(peer) == string(self) {
+ return i
+ }
+ }
+ return -1
+}
+
+func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
+ for _, peer := range s.peers {
+ cfg.Servers = append(cfg.Servers, raft.Server{
+ Suffrage: raft.Voter,
+ ID: raft.ServerID(peer.String()),
+ Address: raft.ServerAddress(peer.ToGrpcAddress()),
+ })
+ }
+ return cfg
+}
+
+func (s *RaftServer) UpdatePeers() {
+ for {
+ select {
+ case isLeader := <-s.RaftHashicorp.LeaderCh():
+ if isLeader {
+ peerLeader := string(s.serverAddr)
+ existsPeerName := make(map[string]bool)
+ for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerLeader {
+ continue
+ }
+ existsPeerName[string(server.ID)] = true
+ }
+ for _, peer := range s.peers {
+ if peer.String() == peerLeader || existsPeerName[peer.String()] {
+ continue
+ }
+ glog.V(0).Infof("adding new peer: %s", peer.String())
+ s.RaftHashicorp.AddVoter(
+ raft.ServerID(peer.String()), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
+ }
+ for peer, _ := range existsPeerName {
+ if _, found := s.peers[peer]; !found {
+ glog.V(0).Infof("removing old peer: %s", peer)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
+ }
+ }
+ if _, found := s.peers[peerLeader]; !found {
+ glog.V(0).Infof("removing old leader peer: %s", peerLeader)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+ }
+ }
+ return
+ case <-time.After(updatePeersTimeout):
+ return
+ }
+ }
+}
+
+func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
+ s := &RaftServer{
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
+ }
+
+ c := raft.DefaultConfig()
+ c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
+ c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ c.ElectionTimeout = option.ElectionTimeout
+ if c.LeaderLeaseTimeout > c.HeartbeatTimeout {
+ c.LeaderLeaseTimeout = c.HeartbeatTimeout
+ }
+ if glog.V(4) {
+ c.LogLevel = "Debug"
+ } else if glog.V(2) {
+ c.LogLevel = "Info"
+ } else if glog.V(1) {
+ c.LogLevel = "Warn"
+ } else if glog.V(0) {
+ c.LogLevel = "Error"
+ }
+
+ if option.RaftBootstrap {
+ os.RemoveAll(path.Join(s.dataDir, ldbFile))
+ os.RemoveAll(path.Join(s.dataDir, sdbFile))
+ os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ }
+ baseDir := s.dataDir
+
+ ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, ldbFile))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
+ }
+
+ sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, sdbFile))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
+ }
+
+ fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
+ if err != nil {
+ return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
+ }
+
+ s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption})
+
+ stateMachine := StateMachine{topo: option.Topo}
+ s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
+ if err != nil {
+ return nil, fmt.Errorf("raft.NewRaft: %v", err)
+ }
+ if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
+ cfg := s.AddPeersConfiguration()
+ // Need to get lock, in case all servers do this at the same time.
+ peerIdx := getPeerIdx(s.serverAddr, s.peers)
+ timeSpeep := time.Duration(float64(c.LeaderLeaseTimeout) * (rand.Float64()*0.25 + 1) * float64(peerIdx))
+ glog.V(0).Infof("Bootstrapping idx: %d sleep: %v new cluster: %+v", peerIdx, timeSpeep, cfg)
+ time.Sleep(timeSpeep)
+ f := s.RaftHashicorp.BootstrapCluster(cfg)
+ if err := f.Error(); err != nil {
+ return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
+ }
+ } else {
+ go s.UpdatePeers()
+ }
+
+ ticker := time.NewTicker(c.HeartbeatTimeout * 10)
+ if glog.V(4) {
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ cfuture := s.RaftHashicorp.GetConfiguration()
+ if err = cfuture.Error(); err != nil {
+ glog.Fatalf("error getting config: %s", err)
+ }
+ configuration := cfuture.Configuration()
+ glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers)
+ }
+ }
+ }()
+ }
+
+ return s, nil
+}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index d559cb691..8c372f0cc 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,6 +2,9 @@ package weed_server
import (
"encoding/json"
+ transport "github.com/Jille/raft-grpc-transport"
+ "io"
+ "io/ioutil"
"math/rand"
"os"
"path"
@@ -12,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
@@ -26,14 +30,17 @@ type RaftServerOption struct {
RaftResumeState bool
HeartbeatInterval time.Duration
ElectionTimeout time.Duration
+ RaftBootstrap bool
}
type RaftServer struct {
- peers map[string]pb.ServerAddress // initial peers to join with
- raftServer raft.Server
- dataDir string
- serverAddr pb.ServerAddress
- topo *topology.Topology
+ peers map[string]pb.ServerAddress // initial peers to join with
+ raftServer raft.Server
+ RaftHashicorp *hashicorpRaft.Raft
+ TransportManager *transport.Manager
+ dataDir string
+ serverAddr pb.ServerAddress
+ topo *topology.Topology
*raft.GrpcServer
}
@@ -42,6 +49,8 @@ type StateMachine struct {
topo *topology.Topology
}
+var _ hashicorpRaft.FSM = &StateMachine{}
+
func (s StateMachine) Save() ([]byte, error) {
state := topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
@@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
+func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
+ before := s.topo.GetMaxVolumeId()
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(l.Data, &state)
+ if err != nil {
+ return err
+ }
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+
+ glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
+ return nil
+}
+
+func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
+ return &topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }, nil
+}
+
+func (s *StateMachine) Restore(r io.ReadCloser) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ if err := s.Recovery(b); err != nil {
+ return err
+ }
+ return nil
+}
+
func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
@@ -132,12 +171,17 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
}
func (s *RaftServer) Peers() (members []string) {
- peers := s.raftServer.Peers()
-
- for _, p := range peers {
- members = append(members, p.Name)
+ if s.raftServer != nil {
+ peers := s.raftServer.Peers()
+ for _, p := range peers {
+ members = append(members, p.Name)
+ }
+ } else if s.RaftHashicorp != nil {
+ cfg := s.RaftHashicorp.GetConfiguration()
+ for _, p := range cfg.Configuration().Servers {
+ members = append(members, string(p.ID))
+ }
}
-
return
}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index 7e58f1e92..cc3e6e37f 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -25,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
}
writeJsonQuiet(w, r, http.StatusOK, ret)
}
+
+func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) {
+ if s.RaftHashicorp == nil {
+ writeJsonQuiet(w, r, http.StatusNotFound, nil)
+ return
+ }
+ writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats())
+}