aboutsummaryrefslogtreecommitdiff
path: root/weed/server/raft_server.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-10-07 01:25:39 -0700
committerChris Lu <chris.lu@gmail.com>2020-10-07 01:25:39 -0700
commitda4edf3651d0dd17570057388e5e012eeda4ff80 (patch)
tree8122c14acacf89d66dec12f97c4a16075e5b2588 /weed/server/raft_server.go
parentc543762e23d5f38dc90365f6e9c04abc5aa5c0d4 (diff)
downloadseaweedfs-da4edf3651d0dd17570057388e5e012eeda4ff80.tar.xz
seaweedfs-da4edf3651d0dd17570057388e5e012eeda4ff80.zip
master: check peers for existing leader before starting a leader election
fix https://github.com/chrislusf/seaweedfs/issues/1509
Diffstat (limited to 'weed/server/raft_server.go')
-rw-r--r--weed/server/raft_server.go85
1 files changed, 27 insertions, 58 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 66a6734e3..073c1ff16 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,10 +2,8 @@ package weed_server
import (
"encoding/json"
- "io/ioutil"
"os"
"path"
- "reflect"
"sort"
"time"
@@ -80,11 +78,6 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
return nil, err
}
- // Clear old cluster configurations if peers are changed
- if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
- glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
- }
-
stateMachine := StateMachine{topo: topo}
s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
@@ -107,19 +100,20 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
}
// Remove deleted peers
- if raftResumeState {
- for existsPeerName, _ := range s.raftServer.Peers() {
- exists := false
- for _, peer := range s.peers {
- if peer == existsPeerName {
- exists = true
- }
+ for existsPeerName := range s.raftServer.Peers() {
+ exists, existingPeer := false, ""
+ for _, peer := range s.peers {
+ if pb.ServerToGrpcAddress(peer) == existsPeerName {
+ exists, existingPeer = true, peer
+ break
}
- if !exists {
- if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
- glog.V(0).Infoln(err)
- return nil, err
- }
+ }
+ if exists {
+ if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
+ glog.V(0).Infoln(err)
+ return nil, err
+ } else {
+ glog.V(0).Infof("removing old peer %s", existingPeer)
}
}
}
@@ -128,17 +122,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
- glog.V(0).Infoln("Initializing new cluster")
-
- _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
- Name: s.raftServer.Name(),
- ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
- })
-
- if err != nil {
- glog.V(0).Infoln(err)
- return nil, err
- }
+ // s.DoJoinCommand()
}
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
@@ -156,34 +140,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) {
- confPath := path.Join(dir, "conf")
- // open conf file
- b, err := ioutil.ReadFile(confPath)
- if err != nil {
- return oldPeers, true
- }
- conf := &raft.Config{}
- if err = json.Unmarshal(b, conf); err != nil {
- return oldPeers, true
- }
-
- for _, p := range conf.Peers {
- oldPeers = append(oldPeers, p.Name)
- }
- oldPeers = append(oldPeers, self)
-
- if len(peers) == 0 && len(oldPeers) <= 1 {
- return oldPeers, false
- }
-
- sort.Strings(peers)
- sort.Strings(oldPeers)
-
- return oldPeers, !reflect.DeepEqual(peers, oldPeers)
-
-}
-
func isTheFirstOne(self string, peers []string) bool {
sort.Strings(peers)
if len(peers) <= 0 {
@@ -191,3 +147,16 @@ func isTheFirstOne(self string, peers []string) bool {
}
return self == peers[0]
}
+
+func (s *RaftServer) DoJoinCommand() {
+
+ glog.V(0).Infoln("Initializing new cluster")
+
+ if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
+ Name: s.raftServer.Name(),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ }); err != nil {
+ glog.Errorf("fail to send join command: %v", err)
+ }
+
+}