aboutsummaryrefslogtreecommitdiff
path: root/weed/server/raft_hashicorp.go
diff options
context:
space:
mode:
authorwyang <wings.wyang@gmail.com>2024-07-27 12:48:36 +0800
committerGitHub <noreply@github.com>2024-07-26 21:48:36 -0700
commit4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3 (patch)
tree7d76240faf81bafdfe058c825172f092e0d30467 /weed/server/raft_hashicorp.go
parentc1bffca24608554952bf03f0a11838aaa567a78a (diff)
downloadseaweedfs-4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3.tar.xz
seaweedfs-4b1f539ab86d4b634e0c3f08c5eb322c13bab5b3.zip
fix allocate reduplicated volumeId to different volume (#5811)
* fix allocate reduplicated volumeId to different volume * only check barrier when read --------- Co-authored-by: Yang Wang <yangwang@weride.ai>
Diffstat (limited to 'weed/server/raft_hashicorp.go')
-rw-r--r--weed/server/raft_hashicorp.go96
1 files changed, 58 insertions, 38 deletions
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index d06066b93..299df323a 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -5,6 +5,14 @@ package weed_server
import (
"fmt"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
transport "github.com/Jille/raft-grpc-transport"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
@@ -14,13 +22,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"google.golang.org/grpc"
- "math/rand"
- "os"
- "path"
- "path/filepath"
- "sort"
- "strings"
- "time"
)
const (
@@ -56,46 +57,61 @@ func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
return cfg
}
-func (s *RaftServer) UpdatePeers() {
+func (s *RaftServer) monitorLeaderLoop(updatePeers bool) {
for {
+ prevLeader, _ := s.RaftHashicorp.LeaderWithID()
select {
case isLeader := <-s.RaftHashicorp.LeaderCh():
+ leader, _ := s.RaftHashicorp.LeaderWithID()
if isLeader {
- peerLeader := string(s.serverAddr)
- existsPeerName := make(map[string]bool)
- for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
- if string(server.ID) == peerLeader {
- continue
- }
- existsPeerName[string(server.ID)] = true
- }
- for _, peer := range s.peers {
- peerName := string(peer)
- if peerName == peerLeader || existsPeerName[peerName] {
- continue
- }
- glog.V(0).Infof("adding new peer: %s", peerName)
- s.RaftHashicorp.AddVoter(
- raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
- }
- for peer := range existsPeerName {
- if _, found := s.peers[peer]; !found {
- glog.V(0).Infof("removing old peer: %s", peer)
- s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
- }
- }
- if _, found := s.peers[peerLeader]; !found {
- glog.V(0).Infof("removing old leader peer: %s", peerLeader)
- s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+
+ if updatePeers {
+ s.updatePeers()
+ updatePeers = false
}
+
+ s.topo.DoBarrier()
+
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ } else {
+ s.topo.BarrierReset()
}
- return
- case <-time.After(updatePeersTimeout):
- return
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ prevLeader = leader
}
}
}
+func (s *RaftServer) updatePeers() {
+ peerLeader := string(s.serverAddr)
+ existsPeerName := make(map[string]bool)
+ for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerLeader {
+ continue
+ }
+ existsPeerName[string(server.ID)] = true
+ }
+ for _, peer := range s.peers {
+ peerName := string(peer)
+ if peerName == peerLeader || existsPeerName[peerName] {
+ continue
+ }
+ glog.V(0).Infof("adding new peer: %s", peerName)
+ s.RaftHashicorp.AddVoter(
+ raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
+ }
+ for peer := range existsPeerName {
+ if _, found := s.peers[peer]; !found {
+ glog.V(0).Infof("removing old peer: %s", peer)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
+ }
+ }
+ if _, found := s.peers[peerLeader]; !found {
+ glog.V(0).Infof("removing old leader peer: %s", peerLeader)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+ }
+}
+
func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
@@ -157,6 +173,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
if err != nil {
return nil, fmt.Errorf("raft.NewRaft: %v", err)
}
+
+ updatePeers := false
if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
cfg := s.AddPeersConfiguration()
// Need to get lock, in case all servers do this at the same time.
@@ -169,9 +187,11 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
} else {
- go s.UpdatePeers()
+ updatePeers = true
}
+ go s.monitorLeaderLoop(updatePeers)
+
ticker := time.NewTicker(c.HeartbeatTimeout * 10)
if glog.V(4) {
go func() {