diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-10-07 01:25:39 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-10-07 01:25:39 -0700 |
| commit | da4edf3651d0dd17570057388e5e012eeda4ff80 (patch) | |
| tree | 8122c14acacf89d66dec12f97c4a16075e5b2588 /weed/server/raft_server.go | |
| parent | c543762e23d5f38dc90365f6e9c04abc5aa5c0d4 (diff) | |
| download | seaweedfs-da4edf3651d0dd17570057388e5e012eeda4ff80.tar.xz seaweedfs-da4edf3651d0dd17570057388e5e012eeda4ff80.zip | |
master: check peers for existing leader before starting a leader election
fix https://github.com/chrislusf/seaweedfs/issues/1509
Diffstat (limited to 'weed/server/raft_server.go')
| -rw-r--r-- | weed/server/raft_server.go | 85 |
1 files changed, 27 insertions, 58 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 66a6734e3..073c1ff16 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,10 +2,8 @@ package weed_server import ( "encoding/json" - "io/ioutil" "os" "path" - "reflect" "sort" "time" @@ -80,11 +78,6 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d return nil, err } - // Clear old cluster configurations if peers are changed - if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed { - glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) - } - stateMachine := StateMachine{topo: topo} s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "") if err != nil { @@ -107,19 +100,20 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d } // Remove deleted peers - if raftResumeState { - for existsPeerName, _ := range s.raftServer.Peers() { - exists := false - for _, peer := range s.peers { - if peer == existsPeerName { - exists = true - } + for existsPeerName := range s.raftServer.Peers() { + exists, existingPeer := false, "" + for _, peer := range s.peers { + if pb.ServerToGrpcAddress(peer) == existsPeerName { + exists, existingPeer = true, peer + break } - if !exists { - if err := s.raftServer.RemovePeer(existsPeerName); err != nil { - glog.V(0).Infoln(err) - return nil, err - } + } + if exists { + if err := s.raftServer.RemovePeer(existsPeerName); err != nil { + glog.V(0).Infoln(err) + return nil, err + } else { + glog.V(0).Infof("removing old peer %s", existingPeer) } } } @@ -128,17 +122,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { // Initialize the server by joining itself. - glog.V(0).Infoln("Initializing new cluster") - - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), - }) - - if err != nil { - glog.V(0).Infoln(err) - return nil, err - } + // s.DoJoinCommand() } glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) @@ -156,34 +140,6 @@ func (s *RaftServer) Peers() (members []string) { return } -func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) { - confPath := path.Join(dir, "conf") - // open conf file - b, err := ioutil.ReadFile(confPath) - if err != nil { - return oldPeers, true - } - conf := &raft.Config{} - if err = json.Unmarshal(b, conf); err != nil { - return oldPeers, true - } - - for _, p := range conf.Peers { - oldPeers = append(oldPeers, p.Name) - } - oldPeers = append(oldPeers, self) - - if len(peers) == 0 && len(oldPeers) <= 1 { - return oldPeers, false - } - - sort.Strings(peers) - sort.Strings(oldPeers) - - return oldPeers, !reflect.DeepEqual(peers, oldPeers) - -} - func isTheFirstOne(self string, peers []string) bool { sort.Strings(peers) if len(peers) <= 0 { @@ -191,3 +147,16 @@ func isTheFirstOne(self string, peers []string) bool { } return self == peers[0] } + +func (s *RaftServer) DoJoinCommand() { + + glog.V(0).Infoln("Initializing new cluster") + + if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), + }); err != nil { + glog.Errorf("fail to send join command: %v", err) + } + +} |
