aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/topology.go')
-rw-r--r--weed/topology/topology.go67
1 files changed, 63 insertions, 4 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 0a4cb4050..6a149bd56 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -50,8 +50,11 @@ type Topology struct {
RaftServer raft.Server
RaftServerAccessLock sync.RWMutex
HashicorpRaft *hashicorpRaft.Raft
- UuidAccessLock sync.RWMutex
- UuidMap map[string][]string
+ barrierLock sync.Mutex
+ barrierDone bool
+
+ UuidAccessLock sync.RWMutex
+ UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -120,6 +123,42 @@ func (t *Topology) IsLeader() bool {
return false
}
+func (t *Topology) IsLeaderAndCanRead() bool {
+ if t.RaftServer != nil {
+ return t.IsLeader()
+ } else if t.HashicorpRaft != nil {
+ return t.IsLeader() && t.DoBarrier()
+ } else {
+ return false
+ }
+}
+
+func (t *Topology) DoBarrier() bool {
+ t.barrierLock.Lock()
+ defer t.barrierLock.Unlock()
+ if t.barrierDone {
+ return true
+ }
+
+ glog.V(0).Infof("raft do barrier")
+ barrier := t.HashicorpRaft.Barrier(2 * time.Minute)
+ if err := barrier.Error(); err != nil {
+ glog.Errorf("failed to wait for barrier, error %s", err)
+ return false
+
+ }
+
+ t.barrierDone = true
+ glog.V(0).Infof("raft do barrier success")
+ return true
+}
+
+func (t *Topology) BarrierReset() {
+ t.barrierLock.Lock()
+ defer t.barrierLock.Unlock()
+ t.barrierDone = false
+}
+
func (t *Topology) Leader() (l pb.ServerAddress, err error) {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.InitialInterval = 100 * time.Millisecond
@@ -180,6 +219,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
}
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
+ if !t.IsLeaderAndCanRead() {
+ return 0, fmt.Errorf("as leader can not read yet")
+
+ }
vid := t.GetMaxVolumeId()
next := vid.Next()
@@ -208,8 +251,8 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption,
if err != nil {
return "", 0, nil, shouldGrow, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
}
- if volumeLocationList.Length() == 0 {
- return "", 0, nil, shouldGrow, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
+ if volumeLocationList == nil || volumeLocationList.Length() == 0 {
+ return "", 0, nil, shouldGrow, fmt.Errorf("%s available for collection:%s replication:%s ttl:%s", noWritableVolumes, option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
nextFileId := t.Sequence.NextFileId(requestedCount)
fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String()
@@ -285,6 +328,22 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
}
}
+func (t *Topology) DataCenterExists(dcName string) bool {
+ return dcName == "" || t.GetOrCreateDataCenter(dcName) != nil
+}
+
+func (t *Topology) GetDataCenter(dcName string) (dc *DataCenter) {
+ t.RLock()
+ defer t.RUnlock()
+ for _, c := range t.children {
+ dc = c.(*DataCenter)
+ if string(dc.Id()) == dcName {
+ return dc
+ }
+ }
+ return dc
+}
+
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
t.Lock()
defer t.Unlock()