aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-04 17:51:51 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-04 17:51:51 +0500
commit14dd97189011d2a8802d5c9cc1726802cf19f2b2 (patch)
tree8f25cc1b33e27c96260b491de66057f9c4d44a65 /weed
parentc514710b7b0454c7a070ebf30aa865c9a5ad1591 (diff)
downloadseaweedfs-14dd97189011d2a8802d5c9cc1726802cf19f2b2.tar.xz
seaweedfs-14dd97189011d2a8802d5c9cc1726802cf19f2b2.zip
hashicorp raft with state machine
Diffstat (limited to 'weed')
-rw-r--r--weed/command/master.go29
-rw-r--r--weed/server/master_server.go42
-rw-r--r--weed/server/raft_hashicorp.go49
-rw-r--r--weed/server/raft_server.go2
-rw-r--r--weed/topology/cluster_commands.go1
-rw-r--r--weed/topology/topology.go25
6 files changed, 104 insertions, 44 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index 459d3e1cb..bbf9b1de2 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,7 +1,6 @@
package command
import (
- "context"
"net/http"
"os"
"sort"
@@ -166,13 +165,14 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
var raftServer *weed_server.RaftServer
var err error
if *m.raftHashicorp {
- ctx := context.Background()
- raftServer, err = weed_server.NewHashicorpRaftServer(ctx, raftServerOption)
+ if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil {
+ glog.Fatalf("NewHashicorpRaftServer: %s", err)
+ }
} else {
raftServer, err = weed_server.NewRaftServer(raftServerOption)
- }
- if raftServer == nil {
- glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ if raftServer == nil {
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ }
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
@@ -199,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
go grpcS.Serve(grpcL)
- go func() {
- time.Sleep(1500 * time.Millisecond)
- if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
- if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
- raftServer.DoJoinCommand()
+ timeSleep := 1500 * time.Millisecond
+ if !*m.raftHashicorp {
+ go func() {
+ time.Sleep(timeSleep)
+ if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
+ if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
+ raftServer.DoJoinCommand()
+ }
}
- }
- }()
+ }()
+ }
go ms.MasterClient.KeepConnectedToMaster()
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index b63e3a418..37e7f245c 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -160,19 +160,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.Topo.RaftServer = raftServer.raftServer
- ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
- stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
- if ms.Topo.RaftServer.Leader() != "" {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
- }
- })
+ var raftServerName string
+ if raftServer.raftServer != nil {
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ }
+ })
+ raftServerName = ms.Topo.RaftServer.Name()
+ } else if raftServer.RaftHashicorp != nil {
+ ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
+ leaderCh := raftServer.RaftHashicorp.LeaderCh()
+ prevLeader := ms.Topo.HashicorpRaft.Leader()
+ go func() {
+ for {
+ select {
+ case isLeader := <-leaderCh:
+ leader := ms.Topo.HashicorpRaft.Leader()
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ prevLeader = leader
+ }
+ }
+ }()
+ raftServerName = ms.Topo.HashicorpRaft.String()
+ }
if ms.Topo.IsLeader() {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
+ glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
- if ms.Topo.RaftServer.Leader() != "" {
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
}
}
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index caef42f62..2ee92331b 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -4,7 +4,6 @@ package weed_server
// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18
import (
- "context"
"fmt"
transport "github.com/Jille/raft-grpc-transport"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -17,7 +16,7 @@ import (
"time"
)
-func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*RaftServer, error) {
+func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
serverAddr: option.ServerAddr,
@@ -27,18 +26,20 @@ func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*Raf
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
+ c.NoSnapshotRestoreOnStart = option.RaftResumeState
c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
c.ElectionTimeout = option.ElectionTimeout
+ if c.LeaderLeaseTimeout > c.HeartbeatTimeout {
+ c.LeaderLeaseTimeout = c.HeartbeatTimeout
+ }
if glog.V(4) {
- c.Logger.SetLevel(1)
- } else if glog.V(3) {
- c.Logger.SetLevel(2)
+ c.LogLevel = "Debug"
} else if glog.V(2) {
- c.Logger.SetLevel(3)
+ c.LogLevel = "Info"
} else if glog.V(1) {
- c.Logger.SetLevel(4)
+ c.LogLevel = "Warn"
} else if glog.V(0) {
- c.Logger.SetLevel(5)
+ c.LogLevel = "Error"
}
baseDir := s.dataDir
@@ -58,41 +59,55 @@ func NewHashicorpRaftServer(ctx context.Context, option *RaftServerOption) (*Raf
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
}
- // s.GrpcServer = raft.NewGrpcServer(s.raftServer)
s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption})
stateMachine := StateMachine{topo: option.Topo}
- r, err := raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
+ s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
if err != nil {
return nil, fmt.Errorf("raft.NewRaft: %v", err)
}
-
if option.RaftBootstrap {
cfg := raft.Configuration{
Servers: []raft.Server{
{
Suffrage: raft.Voter,
ID: c.LocalID,
- Address: raft.ServerAddress(s.serverAddr),
+ Address: raft.ServerAddress(s.serverAddr.ToGrpcAddress()),
},
},
}
// Add known peers to bootstrap
- for _, node := range option.Peers {
- if node == option.ServerAddr {
+ for _, peer := range option.Peers {
+ if peer == option.ServerAddr {
continue
}
cfg.Servers = append(cfg.Servers, raft.Server{
Suffrage: raft.Voter,
- ID: raft.ServerID(node),
- Address: raft.ServerAddress(node),
+ ID: raft.ServerID(peer),
+ Address: raft.ServerAddress(peer.ToGrpcAddress()),
})
}
- f := r.BootstrapCluster(cfg)
+ f := s.RaftHashicorp.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
}
+ ticker := time.NewTicker(c.HeartbeatTimeout * 10)
+ if glog.V(4) {
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ cfuture := s.RaftHashicorp.GetConfiguration()
+ if err = cfuture.Error(); err != nil {
+ glog.Fatalf("error getting config: %s", err)
+ }
+ configuration := cfuture.Configuration()
+ glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers)
+ }
+ }
+ }()
+ }
return s, nil
}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 5ed3724e2..3fedc843e 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -36,7 +36,7 @@ type RaftServerOption struct {
type RaftServer struct {
peers map[string]pb.ServerAddress // initial peers to join with
raftServer raft.Server
- raftHashicorp hashicorpRaft.Raft
+ RaftHashicorp *hashicorpRaft.Raft
TransportManager *transport.Manager
dataDir string
serverAddr pb.ServerAddress
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
index 951de0711..1bcc6b449 100644
--- a/weed/topology/cluster_commands.go
+++ b/weed/topology/cluster_commands.go
@@ -23,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string {
return "MaxVolumeId"
}
+// deprecatedCommandApply represents the old interface to apply a command to the server.
func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
topo := server.Context().(*Topology)
before := topo.GetMaxVolumeId()
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 207c89ad7..d636e8a9e 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -1,6 +1,7 @@
package topology
import (
+ "encoding/json"
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -10,6 +11,7 @@ import (
"time"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -40,7 +42,8 @@ type Topology struct {
Configuration *Configuration
- RaftServer raft.Server
+ RaftServer raft.Server
+ HashicorpRaft *hashicorpRaft.Raft
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -76,6 +79,10 @@ func (t *Topology) IsLeader() bool {
return true
}
}
+ } else if t.HashicorpRaft != nil {
+ if t.HashicorpRaft.State() == hashicorpRaft.Leader {
+ return true
+ }
}
return false
}
@@ -85,6 +92,8 @@ func (t *Topology) Leader() (pb.ServerAddress, error) {
for count := 0; count < 3; count++ {
if t.RaftServer != nil {
l = pb.ServerAddress(t.RaftServer.Leader())
+ } else if t.HashicorpRaft != nil {
+ l = pb.ServerAddress(t.HashicorpRaft.Leader())
} else {
return "", errors.New("Raft Server not ready yet!")
}
@@ -124,8 +133,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
- if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
- return 0, err
+ if t.RaftServer != nil {
+ if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
+ return 0, err
+ }
+ } else if t.HashicorpRaft != nil {
+ b, err := json.Marshal(NewMaxVolumeIdCommand(next))
+ if err != nil {
+ return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
+ }
+ if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil {
+ return 0, future.Error()
+ }
}
return next, nil
}