diff options
Diffstat (limited to 'weed/topology/topology.go')
| -rw-r--r-- | weed/topology/topology.go | 67 |
1 files changed, 63 insertions, 4 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 0a4cb4050..6a149bd56 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -50,8 +50,11 @@ type Topology struct { RaftServer raft.Server RaftServerAccessLock sync.RWMutex HashicorpRaft *hashicorpRaft.Raft - UuidAccessLock sync.RWMutex - UuidMap map[string][]string + barrierLock sync.Mutex + barrierDone bool + + UuidAccessLock sync.RWMutex + UuidMap map[string][]string } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -120,6 +123,42 @@ func (t *Topology) IsLeader() bool { return false } +func (t *Topology) IsLeaderAndCanRead() bool { + if t.RaftServer != nil { + return t.IsLeader() + } else if t.HashicorpRaft != nil { + return t.IsLeader() && t.DoBarrier() + } else { + return false + } +} + +func (t *Topology) DoBarrier() bool { + t.barrierLock.Lock() + defer t.barrierLock.Unlock() + if t.barrierDone { + return true + } + + glog.V(0).Infof("raft do barrier") + barrier := t.HashicorpRaft.Barrier(2 * time.Minute) + if err := barrier.Error(); err != nil { + glog.Errorf("failed to wait for barrier, error %s", err) + return false + + } + + t.barrierDone = true + glog.V(0).Infof("raft do barrier success") + return true +} + +func (t *Topology) BarrierReset() { + t.barrierLock.Lock() + defer t.barrierLock.Unlock() + t.barrierDone = false +} + func (t *Topology) Leader() (l pb.ServerAddress, err error) { exponentialBackoff := backoff.NewExponentialBackOff() exponentialBackoff.InitialInterval = 100 * time.Millisecond @@ -180,6 +219,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* } func (t *Topology) NextVolumeId() (needle.VolumeId, error) { + if !t.IsLeaderAndCanRead() { + return 0, fmt.Errorf("as leader can not read yet") + + } vid := t.GetMaxVolumeId() next := vid.Next() @@ -208,8 +251,8 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, if err != nil { return "", 0, nil, shouldGrow, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) } - if volumeLocationList.Length() == 0 { - return "", 0, nil, shouldGrow, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) + if volumeLocationList == nil || volumeLocationList.Length() == 0 { + return "", 0, nil, shouldGrow, fmt.Errorf("%s available for collection:%s replication:%s ttl:%s", noWritableVolumes, option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } nextFileId := t.Sequence.NextFileId(requestedCount) fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String() @@ -285,6 +328,22 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { } } +func (t *Topology) DataCenterExists(dcName string) bool { + return dcName == "" || t.GetOrCreateDataCenter(dcName) != nil +} + +func (t *Topology) GetDataCenter(dcName string) (dc *DataCenter) { + t.RLock() + defer t.RUnlock() + for _, c := range t.children { + dc = c.(*DataCenter) + if string(dc.Id()) == dcName { + return dc + } + } + return dc +} + func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { t.Lock() defer t.Unlock() |
