aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology.go
diff options
context:
space:
mode:
authorPatrick Schmidt <patrick.schmidt@innogames.com>2022-08-24 18:49:05 +0200
committerGitHub <noreply@github.com>2022-08-24 09:49:05 -0700
commit7b424a54dc56c883a3e03894d924631a4ef7a94c (patch)
tree0b98b4dcee9d2294b50282529dab8f1b0aac6a39 /weed/topology/topology.go
parentf7e4359b597c9acdb0478c05c6f6ae6fbb56836b (diff)
downloadseaweedfs-7b424a54dc56c883a3e03894d924631a4ef7a94c.tar.xz
seaweedfs-7b424a54dc56c883a3e03894d924631a4ef7a94c.zip
Add raft server access mutex to avoid races (#3503)
Diffstat (limited to 'weed/topology/topology.go')
-rw-r--r--weed/topology/topology.go50
1 files changed, 35 insertions, 15 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 546642841..35224d280 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -43,10 +43,11 @@ type Topology struct {
Configuration *Configuration
- RaftServer raft.Server
- HashicorpRaft *hashicorpRaft.Raft
- UuidAccessLock sync.RWMutex
- UuidMap map[string][]string
+ RaftServer raft.Server
+ RaftServerAccessLock sync.RWMutex
+ HashicorpRaft *hashicorpRaft.Raft
+ UuidAccessLock sync.RWMutex
+ UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -73,6 +74,9 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
}
func (t *Topology) IsLeader() bool {
+ t.RaftServerAccessLock.RLock()
+ defer t.RaftServerAccessLock.RUnlock()
+
if t.RaftServer != nil {
if t.RaftServer.State() == raft.Leader {
return true
@@ -90,23 +94,35 @@ func (t *Topology) IsLeader() bool {
return false
}
-func (t *Topology) Leader() (pb.ServerAddress, error) {
- var l pb.ServerAddress
+func (t *Topology) Leader() (l pb.ServerAddress, err error) {
for count := 0; count < 3; count++ {
- if t.RaftServer != nil {
- l = pb.ServerAddress(t.RaftServer.Leader())
- } else if t.HashicorpRaft != nil {
- l = pb.ServerAddress(t.HashicorpRaft.Leader())
- } else {
- return "", errors.New("Raft Server not ready yet!")
+ l, err = t.MaybeLeader()
+ if err != nil {
+ return
}
if l != "" {
break
- } else {
- time.Sleep(time.Duration(5+count) * time.Second)
}
+
+ time.Sleep(time.Duration(5+count) * time.Second)
+ }
+
+ return
+}
+
+func (t *Topology) MaybeLeader() (l pb.ServerAddress, err error) {
+ t.RaftServerAccessLock.RLock()
+ defer t.RaftServerAccessLock.RUnlock()
+
+ if t.RaftServer != nil {
+ l = pb.ServerAddress(t.RaftServer.Leader())
+ } else if t.HashicorpRaft != nil {
+ l = pb.ServerAddress(t.HashicorpRaft.Leader())
+ } else {
+ err = errors.New("Raft Server not ready yet!")
}
- return l, nil
+
+ return
}
func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
@@ -136,6 +152,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
+
+ t.RaftServerAccessLock.RLock()
+ defer t.RaftServerAccessLock.RUnlock()
+
if t.RaftServer != nil {
if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
return 0, err