diff options
| author | wyang <wings.wyang@gmail.com> | 2024-07-27 12:48:36 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-26 21:48:36 -0700 |
| commit | 4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3 (patch) | |
| tree | 7d76240faf81bafdfe058c825172f092e0d30467 /weed/topology/topology.go | |
| parent | c1bffca24608554952bf03f0a11838aaa567a78a (diff) | |
| download | seaweedfs-4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3.tar.xz seaweedfs-4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3.zip | |
fix allocate reduplicated volumeId to different volume (#5811)
* fix allocate reduplicated volumeId to different volume
* only check barrier when read
---------
Co-authored-by: Yang Wang <yangwang@weride.ai>
Diffstat (limited to 'weed/topology/topology.go')
| -rw-r--r-- | weed/topology/topology.go | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 01822cbf2..44566e361 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -50,8 +50,11 @@ type Topology struct { RaftServer raft.Server RaftServerAccessLock sync.RWMutex HashicorpRaft *hashicorpRaft.Raft - UuidAccessLock sync.RWMutex - UuidMap map[string][]string + barrierLock sync.Mutex + barrierDone bool + + UuidAccessLock sync.RWMutex + UuidMap map[string][]string } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -120,6 +123,42 @@ func (t *Topology) IsLeader() bool { return false } +func (t *Topology) IsLeaderAndCanRead() bool { + if t.RaftServer != nil { + return t.IsLeader() + } else if t.HashicorpRaft != nil { + return t.IsLeader() && t.DoBarrier() + } else { + return false + } +} + +func (t *Topology) DoBarrier() bool { + t.barrierLock.Lock() + defer t.barrierLock.Unlock() + if t.barrierDone { + return true + } + + glog.V(0).Infof("raft do barrier") + barrier := t.HashicorpRaft.Barrier(2 * time.Minute) + if err := barrier.Error(); err != nil { + glog.Errorf("failed to wait for barrier, error %s", err) + return false + + } + + t.barrierDone = true + glog.V(0).Infof("raft do barrier success") + return true +} + +func (t *Topology) BarrierReset() { + t.barrierLock.Lock() + defer t.barrierLock.Unlock() + t.barrierDone = false +} + func (t *Topology) Leader() (l pb.ServerAddress, err error) { exponentialBackoff := backoff.NewExponentialBackOff() exponentialBackoff.InitialInterval = 100 * time.Millisecond @@ -180,6 +219,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* } func (t *Topology) NextVolumeId() (needle.VolumeId, error) { + if !t.IsLeaderAndCanRead() { + return 0, fmt.Errorf("as leader can not read yet") + + } vid := t.GetMaxVolumeId() next := vid.Next() |
