diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-10-07 01:25:39 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-10-07 01:25:39 -0700 |
| commit | da4edf3651d0dd17570057388e5e012eeda4ff80 (patch) | |
| tree | 8122c14acacf89d66dec12f97c4a16075e5b2588 /weed/server | |
| parent | c543762e23d5f38dc90365f6e9c04abc5aa5c0d4 (diff) | |
| download | seaweedfs-da4edf3651d0dd17570057388e5e012eeda4ff80.tar.xz seaweedfs-da4edf3651d0dd17570057388e5e012eeda4ff80.zip | |
master: check peers for existing leader before starting a leader election
fix https://github.com/chrislusf/seaweedfs/issues/1509
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server.go | 17 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 13 | ||||
| -rw-r--r-- | weed/server/master_server.go | 5 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 85 |
4 files changed, 45 insertions, 75 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 692909a29..e8fa3995d 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "net" "strings" "time" @@ -302,3 +303,19 @@ func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.Li } return resp, nil } + +func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { + + // tell the volume servers about the leader + leader, _ := ms.Topo.Leader() + + resp := &master_pb.GetMasterConfigurationResponse{ + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + StorageBackends: backend.ToPbStorageBackends(), + DefaultReplication: ms.option.DefaultReplicaPlacement, + Leader: leader, + } + + return resp, nil +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 6a320dfb1..7e12b8cd8 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -3,8 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -180,14 +178,3 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku return resp, nil } -func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { - - resp := &master_pb.GetMasterConfigurationResponse{ - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), - StorageBackends: backend.ToPbStorageBackends(), - DefaultReplication: ms.option.DefaultReplicaPlacement, - } - - return resp, nil -} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 657b170c2..cc1c4b2ad 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -138,14 +138,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("event: %+v", e) + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) if ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") } }) - ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) { - glog.V(0).Infof("state change: %+v", e) - }) if ms.Topo.IsLeader() { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") } else { diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 66a6734e3..073c1ff16 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,10 +2,8 @@ package weed_server import ( "encoding/json" - "io/ioutil" "os" "path" - "reflect" "sort" "time" @@ -80,11 +78,6 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d return nil, err } - // Clear old cluster configurations if peers are changed - if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed { - glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) - } - stateMachine := StateMachine{topo: topo} s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "") if err != nil { @@ -107,19 +100,20 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d } // Remove deleted peers - if raftResumeState { - for existsPeerName, _ := range s.raftServer.Peers() { - exists := false - for _, peer := range s.peers { - if peer == existsPeerName { - exists = true - } + for existsPeerName := range s.raftServer.Peers() { + exists, existingPeer := false, "" + for _, peer := range s.peers { + if pb.ServerToGrpcAddress(peer) == existsPeerName { + exists, existingPeer = true, peer + break } - if !exists { - if err := s.raftServer.RemovePeer(existsPeerName); err != nil { - glog.V(0).Infoln(err) - return nil, err - } + } + if exists { + if err := s.raftServer.RemovePeer(existsPeerName); err != nil { + glog.V(0).Infoln(err) + return nil, err + } else { + glog.V(0).Infof("removing old peer %s", existingPeer) } } } @@ -128,17 +122,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { // Initialize the server by joining itself. - glog.V(0).Infoln("Initializing new cluster") - - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), - }) - - if err != nil { - glog.V(0).Infoln(err) - return nil, err - } + // s.DoJoinCommand() } glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) @@ -156,34 +140,6 @@ func (s *RaftServer) Peers() (members []string) { return } -func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) { - confPath := path.Join(dir, "conf") - // open conf file - b, err := ioutil.ReadFile(confPath) - if err != nil { - return oldPeers, true - } - conf := &raft.Config{} - if err = json.Unmarshal(b, conf); err != nil { - return oldPeers, true - } - - for _, p := range conf.Peers { - oldPeers = append(oldPeers, p.Name) - } - oldPeers = append(oldPeers, self) - - if len(peers) == 0 && len(oldPeers) <= 1 { - return oldPeers, false - } - - sort.Strings(peers) - sort.Strings(oldPeers) - - return oldPeers, !reflect.DeepEqual(peers, oldPeers) - -} - func isTheFirstOne(self string, peers []string) bool { sort.Strings(peers) if len(peers) <= 0 { @@ -191,3 +147,16 @@ func isTheFirstOne(self string, peers []string) bool { } return self == peers[0] } + +func (s *RaftServer) DoJoinCommand() { + + glog.V(0).Infoln("Initializing new cluster") + + if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), + }); err != nil { + glog.Errorf("fail to send join command: %v", err) + } + +} |
