diff options
| author | wyang <wings.wyang@gmail.com> | 2024-07-27 12:48:36 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-26 21:48:36 -0700 |
| commit | 4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3 (patch) | |
| tree | 7d76240faf81bafdfe058c825172f092e0d30467 /weed/server/raft_hashicorp.go | |
| parent | c1bffca24608554952bf03f0a11838aaa567a78a (diff) | |
| download | seaweedfs-4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3.tar.xz seaweedfs-4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3.zip | |
fix allocate reduplicated volumeId to different volume (#5811)
* fix allocate reduplicated volumeId to different volume
* only check barrier when read
---------
Co-authored-by: Yang Wang <yangwang@weride.ai>
Diffstat (limited to 'weed/server/raft_hashicorp.go')
| -rw-r--r-- | weed/server/raft_hashicorp.go | 96 |
1 files changed, 58 insertions, 38 deletions
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index d06066b93..299df323a 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -5,6 +5,14 @@ package weed_server import ( "fmt" + "math/rand" + "os" + "path" + "path/filepath" + "sort" + "strings" + "time" + transport "github.com/Jille/raft-grpc-transport" "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" @@ -14,13 +22,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/stats" "google.golang.org/grpc" - "math/rand" - "os" - "path" - "path/filepath" - "sort" - "strings" - "time" ) const ( @@ -56,46 +57,61 @@ func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { return cfg } -func (s *RaftServer) UpdatePeers() { +func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { for { + prevLeader, _ := s.RaftHashicorp.LeaderWithID() select { case isLeader := <-s.RaftHashicorp.LeaderCh(): + leader, _ := s.RaftHashicorp.LeaderWithID() if isLeader { - peerLeader := string(s.serverAddr) - existsPeerName := make(map[string]bool) - for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { - if string(server.ID) == peerLeader { - continue - } - existsPeerName[string(server.ID)] = true - } - for _, peer := range s.peers { - peerName := string(peer) - if peerName == peerLeader || existsPeerName[peerName] { - continue - } - glog.V(0).Infof("adding new peer: %s", peerName) - s.RaftHashicorp.AddVoter( - raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) - } - for peer := range existsPeerName { - if _, found := s.peers[peer]; !found { - glog.V(0).Infof("removing old peer: %s", peer) - s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) - } - } - if _, found := s.peers[peerLeader]; !found { - glog.V(0).Infof("removing old leader peer: %s", peerLeader) - s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + + if updatePeers { + s.updatePeers() + updatePeers = false } + + s.topo.DoBarrier() + + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + } else { + s.topo.BarrierReset() } - return - case <-time.After(updatePeersTimeout): - return + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + prevLeader = leader } } } +func (s *RaftServer) updatePeers() { + peerLeader := string(s.serverAddr) + existsPeerName := make(map[string]bool) + for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerLeader { + continue + } + existsPeerName[string(server.ID)] = true + } + for _, peer := range s.peers { + peerName := string(peer) + if peerName == peerLeader || existsPeerName[peerName] { + continue + } + glog.V(0).Infof("adding new peer: %s", peerName) + s.RaftHashicorp.AddVoter( + raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) + } + for peer := range existsPeerName { + if _, found := s.peers[peer]; !found { + glog.V(0).Infof("removing old peer: %s", peer) + s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) + } + } + if _, found := s.peers[peerLeader]; !found { + glog.V(0).Infof("removing old leader peer: %s", peerLeader) + s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + } +} + func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -157,6 +173,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { if err != nil { return nil, fmt.Errorf("raft.NewRaft: %v", err) } + + updatePeers := false if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { cfg := s.AddPeersConfiguration() // Need to get lock, in case all servers do this at the same time. @@ -169,9 +187,11 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) } } else { - go s.UpdatePeers() + updatePeers = true } + go s.monitorLeaderLoop(updatePeers) + ticker := time.NewTicker(c.HeartbeatTimeout * 10) if glog.V(4) { go func() { |
