diff options
Diffstat (limited to 'weed/topology/topology.go')
| -rw-r--r-- | weed/topology/topology.go | 50 |
1 files changed, 35 insertions, 15 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 546642841..35224d280 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -43,10 +43,11 @@ type Topology struct { Configuration *Configuration - RaftServer raft.Server - HashicorpRaft *hashicorpRaft.Raft - UuidAccessLock sync.RWMutex - UuidMap map[string][]string + RaftServer raft.Server + RaftServerAccessLock sync.RWMutex + HashicorpRaft *hashicorpRaft.Raft + UuidAccessLock sync.RWMutex + UuidMap map[string][]string } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -73,6 +74,9 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls } func (t *Topology) IsLeader() bool { + t.RaftServerAccessLock.RLock() + defer t.RaftServerAccessLock.RUnlock() + if t.RaftServer != nil { if t.RaftServer.State() == raft.Leader { return true @@ -90,23 +94,35 @@ func (t *Topology) IsLeader() bool { return false } -func (t *Topology) Leader() (pb.ServerAddress, error) { - var l pb.ServerAddress +func (t *Topology) Leader() (l pb.ServerAddress, err error) { for count := 0; count < 3; count++ { - if t.RaftServer != nil { - l = pb.ServerAddress(t.RaftServer.Leader()) - } else if t.HashicorpRaft != nil { - l = pb.ServerAddress(t.HashicorpRaft.Leader()) - } else { - return "", errors.New("Raft Server not ready yet!") + l, err = t.MaybeLeader() + if err != nil { + return } if l != "" { break - } else { - time.Sleep(time.Duration(5+count) * time.Second) } + + time.Sleep(time.Duration(5+count) * time.Second) + } + + return +} + +func (t *Topology) MaybeLeader() (l pb.ServerAddress, err error) { + t.RaftServerAccessLock.RLock() + defer t.RaftServerAccessLock.RUnlock() + + if t.RaftServer != nil { + l = pb.ServerAddress(t.RaftServer.Leader()) + } else if t.HashicorpRaft != nil { + l = pb.ServerAddress(t.HashicorpRaft.Leader()) + } else { + err = errors.New("Raft Server not ready yet!") } - return l, nil + + return } func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { @@ -136,6 +152,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() + + t.RaftServerAccessLock.RLock() + defer t.RaftServerAccessLock.RUnlock() + if t.RaftServer != nil { if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { return 0, err |
