aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/master_server.go15
-rw-r--r--weed/server/raft_hashicorp.go96
-rw-r--r--weed/server/raft_server.go3
-rw-r--r--weed/topology/topology.go47
4 files changed, 105 insertions, 56 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 65fa622e7..44a1664c0 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -186,22 +186,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name())
} else if raftServer.RaftHashicorp != nil {
ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
- leaderCh := raftServer.RaftHashicorp.LeaderCh()
- prevLeader, _ := ms.Topo.HashicorpRaft.LeaderWithID()
raftServerName = ms.Topo.HashicorpRaft.String()
- go func() {
- for {
- select {
- case isLeader := <-leaderCh:
- ms.Topo.RaftServerAccessLock.RLock()
- leader, _ := ms.Topo.HashicorpRaft.LeaderWithID()
- ms.Topo.RaftServerAccessLock.RUnlock()
- glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
- stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
- prevLeader = leader
- }
- }
- }()
}
ms.Topo.RaftServerAccessLock.Unlock()
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index d06066b93..299df323a 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -5,6 +5,14 @@ package weed_server
import (
"fmt"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
transport "github.com/Jille/raft-grpc-transport"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
@@ -14,13 +22,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"google.golang.org/grpc"
- "math/rand"
- "os"
- "path"
- "path/filepath"
- "sort"
- "strings"
- "time"
)
const (
@@ -56,46 +57,61 @@ func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
return cfg
}
-func (s *RaftServer) UpdatePeers() {
+func (s *RaftServer) monitorLeaderLoop(updatePeers bool) {
for {
+ prevLeader, _ := s.RaftHashicorp.LeaderWithID()
select {
case isLeader := <-s.RaftHashicorp.LeaderCh():
+ leader, _ := s.RaftHashicorp.LeaderWithID()
if isLeader {
- peerLeader := string(s.serverAddr)
- existsPeerName := make(map[string]bool)
- for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
- if string(server.ID) == peerLeader {
- continue
- }
- existsPeerName[string(server.ID)] = true
- }
- for _, peer := range s.peers {
- peerName := string(peer)
- if peerName == peerLeader || existsPeerName[peerName] {
- continue
- }
- glog.V(0).Infof("adding new peer: %s", peerName)
- s.RaftHashicorp.AddVoter(
- raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
- }
- for peer := range existsPeerName {
- if _, found := s.peers[peer]; !found {
- glog.V(0).Infof("removing old peer: %s", peer)
- s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
- }
- }
- if _, found := s.peers[peerLeader]; !found {
- glog.V(0).Infof("removing old leader peer: %s", peerLeader)
- s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+
+ if updatePeers {
+ s.updatePeers()
+ updatePeers = false
}
+
+ s.topo.DoBarrier()
+
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ } else {
+ s.topo.BarrierReset()
}
- return
- case <-time.After(updatePeersTimeout):
- return
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ prevLeader = leader
}
}
}
+func (s *RaftServer) updatePeers() {
+ peerLeader := string(s.serverAddr)
+ existsPeerName := make(map[string]bool)
+ for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerLeader {
+ continue
+ }
+ existsPeerName[string(server.ID)] = true
+ }
+ for _, peer := range s.peers {
+ peerName := string(peer)
+ if peerName == peerLeader || existsPeerName[peerName] {
+ continue
+ }
+ glog.V(0).Infof("adding new peer: %s", peerName)
+ s.RaftHashicorp.AddVoter(
+ raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
+ }
+ for peer := range existsPeerName {
+ if _, found := s.peers[peer]; !found {
+ glog.V(0).Infof("removing old peer: %s", peer)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
+ }
+ }
+ if _, found := s.peers[peerLeader]; !found {
+ glog.V(0).Infof("removing old leader peer: %s", peerLeader)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+ }
+}
+
func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
@@ -157,6 +173,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
if err != nil {
return nil, fmt.Errorf("raft.NewRaft: %v", err)
}
+
+ updatePeers := false
if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
cfg := s.AddPeersConfiguration()
// Need to get lock, in case all servers do this at the same time.
@@ -169,9 +187,11 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
} else {
- go s.UpdatePeers()
+ updatePeers = true
}
+ go s.monitorLeaderLoop(updatePeers)
+
ticker := time.NewTicker(c.HeartbeatTimeout * 10)
if glog.V(4) {
go func() {
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index d718ecac7..4bcd808c2 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,13 +2,14 @@ package weed_server
import (
"encoding/json"
- transport "github.com/Jille/raft-grpc-transport"
"io"
"math/rand"
"os"
"path"
"time"
+ transport "github.com/Jille/raft-grpc-transport"
+
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/pb"
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 01822cbf2..44566e361 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -50,8 +50,11 @@ type Topology struct {
RaftServer raft.Server
RaftServerAccessLock sync.RWMutex
HashicorpRaft *hashicorpRaft.Raft
- UuidAccessLock sync.RWMutex
- UuidMap map[string][]string
+ barrierLock sync.Mutex
+ barrierDone bool
+
+ UuidAccessLock sync.RWMutex
+ UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -120,6 +123,42 @@ func (t *Topology) IsLeader() bool {
return false
}
+func (t *Topology) IsLeaderAndCanRead() bool {
+ if t.RaftServer != nil {
+ return t.IsLeader()
+ } else if t.HashicorpRaft != nil {
+ return t.IsLeader() && t.DoBarrier()
+ } else {
+ return false
+ }
+}
+
+func (t *Topology) DoBarrier() bool {
+ t.barrierLock.Lock()
+ defer t.barrierLock.Unlock()
+ if t.barrierDone {
+ return true
+ }
+
+ glog.V(0).Infof("raft do barrier")
+ barrier := t.HashicorpRaft.Barrier(2 * time.Minute)
+ if err := barrier.Error(); err != nil {
+ glog.Errorf("failed to wait for barrier, error %s", err)
+ return false
+
+ }
+
+ t.barrierDone = true
+ glog.V(0).Infof("raft do barrier success")
+ return true
+}
+
+func (t *Topology) BarrierReset() {
+ t.barrierLock.Lock()
+ defer t.barrierLock.Unlock()
+ t.barrierDone = false
+}
+
func (t *Topology) Leader() (l pb.ServerAddress, err error) {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.InitialInterval = 100 * time.Millisecond
@@ -180,6 +219,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
}
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
+ if !t.IsLeaderAndCanRead() {
+ return 0, fmt.Errorf("as leader can not read yet")
+
+ }
vid := t.GetMaxVolumeId()
next := vid.Next()