aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Schmidt <patrick.schmidt@innogames.com>2022-08-24 18:49:05 +0200
committerGitHub <noreply@github.com>2022-08-24 09:49:05 -0700
commit7b424a54dc56c883a3e03894d924631a4ef7a94c (patch)
tree0b98b4dcee9d2294b50282529dab8f1b0aac6a39
parentf7e4359b597c9acdb0478c05c6f6ae6fbb56836b (diff)
downloadseaweedfs-7b424a54dc56c883a3e03894d924631a4ef7a94c.tar.xz
seaweedfs-7b424a54dc56c883a3e03894d924631a4ef7a94c.zip
Add raft server access mutex to avoid races (#3503)
-rw-r--r--weed/command/master.go10
-rw-r--r--weed/server/master_grpc_server_raft.go11
-rw-r--r--weed/server/master_server.go22
-rw-r--r--weed/server/master_server_handlers_ui.go4
-rw-r--r--weed/topology/topology.go50
5 files changed, 72 insertions, 25 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index 6ef511742..908299c8a 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -206,11 +206,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
if !*m.raftHashicorp {
go func() {
time.Sleep(timeSleep)
- if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
- if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
- raftServer.DoJoinCommand()
- }
+
+ ms.Topo.RaftServerAccessLock.RLock()
+ isEmptyMaster := ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty()
+ if isEmptyMaster && isTheFirstOne(myMasterAddress, peers) && ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
+ raftServer.DoJoinCommand()
}
+ ms.Topo.RaftServerAccessLock.RUnlock()
}()
}
diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go
index 45a6c7a7e..7f8ad70df 100644
--- a/weed/server/master_grpc_server_raft.go
+++ b/weed/server/master_grpc_server_raft.go
@@ -3,7 +3,9 @@ package weed_server
import (
"context"
"fmt"
+
"github.com/hashicorp/raft"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
@@ -11,11 +13,14 @@ import (
func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
resp := &master_pb.RaftListClusterServersResponse{}
+ ms.Topo.RaftServerAccessLock.RLock()
if ms.Topo.HashicorpRaft == nil {
+ ms.Topo.RaftServerAccessLock.RUnlock()
return resp, nil
}
servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
+ ms.Topo.RaftServerAccessLock.RUnlock()
for _, server := range servers {
resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
@@ -30,6 +35,9 @@ func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_
func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
resp := &master_pb.RaftAddServerResponse{}
+ ms.Topo.RaftServerAccessLock.RLock()
+ defer ms.Topo.RaftServerAccessLock.RUnlock()
+
if ms.Topo.HashicorpRaft == nil {
return resp, nil
}
@@ -54,6 +62,9 @@ func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAd
func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
resp := &master_pb.RaftRemoveServerResponse{}
+ ms.Topo.RaftServerAccessLock.RLock()
+ defer ms.Topo.RaftServerAccessLock.RUnlock()
+
if ms.Topo.HashicorpRaft == nil {
return resp, nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index feee59455..ecbfd64af 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -166,6 +166,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
var raftServerName string
+
+ ms.Topo.RaftServerAccessLock.Lock()
if raftServer.raftServer != nil {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
@@ -193,14 +195,18 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
}()
raftServerName = ms.Topo.HashicorpRaft.String()
}
+ ms.Topo.RaftServerAccessLock.Unlock()
+
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
+ ms.Topo.RaftServerAccessLock.RLock()
if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
} else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
+ ms.Topo.RaftServerAccessLock.RUnlock()
}
}
@@ -210,16 +216,15 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
f(w, r)
return
}
- var raftServerLeader string
- if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
- raftServerLeader = ms.Topo.RaftServer.Leader()
- } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
- raftServerLeader = string(ms.Topo.HashicorpRaft.Leader())
- }
+
+ // get the current raft leader
+ leaderAddr, _ := ms.Topo.MaybeLeader()
+ raftServerLeader := string(leaderAddr)
if raftServerLeader == "" {
f(w, r)
return
}
+
ms.boundedLeaderChan <- 1
defer func() { <-ms.boundedLeaderChan }()
targetUrl, err := url.Parse("http://" + raftServerLeader)
@@ -228,6 +233,8 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err))
return
}
+
+ // proxy to leader
glog.V(4).Infoln("proxying to leader", raftServerLeader)
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
director := proxy.Director
@@ -336,6 +343,9 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ ms.Topo.RaftServerAccessLock.RLock()
+ defer ms.Topo.RaftServerAccessLock.RUnlock()
+
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return
}
diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go
index e377a0e19..2c6fa3474 100644
--- a/weed/server/master_server_handlers_ui.go
+++ b/weed/server/master_server_handlers_ui.go
@@ -16,6 +16,10 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId()
+
+ ms.Topo.RaftServerAccessLock.RLock()
+ defer ms.Topo.RaftServerAccessLock.RUnlock()
+
if ms.Topo.RaftServer != nil {
args := struct {
Version string
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 546642841..35224d280 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -43,10 +43,11 @@ type Topology struct {
Configuration *Configuration
- RaftServer raft.Server
- HashicorpRaft *hashicorpRaft.Raft
- UuidAccessLock sync.RWMutex
- UuidMap map[string][]string
+ RaftServer raft.Server
+ RaftServerAccessLock sync.RWMutex
+ HashicorpRaft *hashicorpRaft.Raft
+ UuidAccessLock sync.RWMutex
+ UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -73,6 +74,9 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
}
func (t *Topology) IsLeader() bool {
+ t.RaftServerAccessLock.RLock()
+ defer t.RaftServerAccessLock.RUnlock()
+
if t.RaftServer != nil {
if t.RaftServer.State() == raft.Leader {
return true
@@ -90,23 +94,35 @@ func (t *Topology) IsLeader() bool {
return false
}
-func (t *Topology) Leader() (pb.ServerAddress, error) {
- var l pb.ServerAddress
+func (t *Topology) Leader() (l pb.ServerAddress, err error) {
for count := 0; count < 3; count++ {
- if t.RaftServer != nil {
- l = pb.ServerAddress(t.RaftServer.Leader())
- } else if t.HashicorpRaft != nil {
- l = pb.ServerAddress(t.HashicorpRaft.Leader())
- } else {
- return "", errors.New("Raft Server not ready yet!")
+ l, err = t.MaybeLeader()
+ if err != nil {
+ return
}
if l != "" {
break
- } else {
- time.Sleep(time.Duration(5+count) * time.Second)
}
+
+ time.Sleep(time.Duration(5+count) * time.Second)
+ }
+
+ return
+}
+
+func (t *Topology) MaybeLeader() (l pb.ServerAddress, err error) {
+ t.RaftServerAccessLock.RLock()
+ defer t.RaftServerAccessLock.RUnlock()
+
+ if t.RaftServer != nil {
+ l = pb.ServerAddress(t.RaftServer.Leader())
+ } else if t.HashicorpRaft != nil {
+ l = pb.ServerAddress(t.HashicorpRaft.Leader())
+ } else {
+ err = errors.New("Raft Server not ready yet!")
}
- return l, nil
+
+ return
}
func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
@@ -136,6 +152,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
+
+ t.RaftServerAccessLock.RLock()
+ defer t.RaftServerAccessLock.RUnlock()
+
if t.RaftServer != nil {
if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
return 0, err