aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/topology.go')
-rw-r--r--weed/topology/topology.go47
1 files changed, 45 insertions, 2 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 01822cbf2..44566e361 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -50,8 +50,11 @@ type Topology struct {
RaftServer raft.Server
RaftServerAccessLock sync.RWMutex
HashicorpRaft *hashicorpRaft.Raft
- UuidAccessLock sync.RWMutex
- UuidMap map[string][]string
+ barrierLock sync.Mutex
+ barrierDone bool
+
+ UuidAccessLock sync.RWMutex
+ UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -120,6 +123,42 @@ func (t *Topology) IsLeader() bool {
return false
}
+func (t *Topology) IsLeaderAndCanRead() bool {
+ if t.RaftServer != nil {
+ return t.IsLeader()
+ } else if t.HashicorpRaft != nil {
+ return t.IsLeader() && t.DoBarrier()
+ } else {
+ return false
+ }
+}
+
+func (t *Topology) DoBarrier() bool {
+ t.barrierLock.Lock()
+ defer t.barrierLock.Unlock()
+ if t.barrierDone {
+ return true
+ }
+
+ glog.V(0).Infof("raft do barrier")
+ barrier := t.HashicorpRaft.Barrier(2 * time.Minute)
+ if err := barrier.Error(); err != nil {
+ glog.Errorf("failed to wait for barrier, error %s", err)
+ return false
+
+ }
+
+ t.barrierDone = true
+ glog.V(0).Infof("raft do barrier success")
+ return true
+}
+
+func (t *Topology) BarrierReset() {
+ t.barrierLock.Lock()
+ defer t.barrierLock.Unlock()
+ t.barrierDone = false
+}
+
func (t *Topology) Leader() (l pb.ServerAddress, err error) {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.InitialInterval = 100 * time.Millisecond
@@ -180,6 +219,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
}
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
+ if !t.IsLeaderAndCanRead() {
+ return 0, fmt.Errorf("as leader can not read yet")
+
+ }
vid := t.GetMaxVolumeId()
next := vid.Next()