diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-06-12 01:54:09 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-06-12 01:54:09 -0700 |
| commit | 03f50180f3f6bfca5ae61fab0fa1c3b1db939e05 (patch) | |
| tree | 35a236730feafdc8d651d67f996593ac6ffe3ffe /weed/server/raft_server.go | |
| parent | 69b4f9383086f71a04a28d358d1ccca1cc149ff2 (diff) | |
| download | seaweedfs-03f50180f3f6bfca5ae61fab0fa1c3b1db939e05.tar.xz seaweedfs-03f50180f3f6bfca5ae61fab0fa1c3b1db939e05.zip | |
simplifying the leader election by raft
fixing https://github.com/chrislusf/seaweedfs/issues/629
Diffstat (limited to 'weed/server/raft_server.go')
| -rw-r--r-- | weed/server/raft_server.go | 107 |
1 files changed, 8 insertions, 99 deletions
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 61adcdc59..e2a091e83 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -1,14 +1,8 @@ package weed_server import ( - "bytes" "encoding/json" - "errors" - "fmt" "io/ioutil" - "math/rand" - "net/http" - "net/url" "os" "path" "reflect" @@ -49,7 +43,7 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin var err error transporter := raft.NewHTTPTransporter("/cluster", 0) transporter.Transport.MaxIdleConnsPerHost = 1024 - glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr) + glog.V(0).Infof("Starting RaftServer with %v", httpAddr) // Clear old cluster configurations if peers are changed if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed { @@ -69,31 +63,13 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond) s.raftServer.Start() - s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST") s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") - if len(s.peers) > 0 { - // Join to leader if specified. - for { - glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - firstJoinError := s.Join(s.peers) - if firstJoinError != nil { - glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - }) - if err != nil { - glog.V(0).Infoln(err) - } else { - break - } - } else { - break - } - } - } else if s.raftServer.IsLogEmpty() { + for _, peer := range s.peers { + s.raftServer.AddPeer(peer, "http://"+peer) + } + time.Sleep(2 * time.Second) + if s.raftServer.IsLogEmpty() { // Initialize the server by joining itself. glog.V(0).Infoln("Initializing new cluster") @@ -106,11 +82,10 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin glog.V(0).Infoln(err) return nil } - - } else { - glog.V(0).Infoln("Old conf,log,snapshot should have been removed.") } + glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) + return s } @@ -151,69 +126,3 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, return oldPeers, !reflect.DeepEqual(peers, oldPeers) } - -// Join joins an existing cluster. -func (s *RaftServer) Join(peers []string) error { - command := &raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - } - - var err error - var b bytes.Buffer - json.NewEncoder(&b).Encode(command) - for _, m := range peers { - if m == s.httpAddr { - continue - } - target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) - glog.V(0).Infoln("Attempting to connect to:", target) - - err = postFollowingOneRedirect(target, "application/json", b) - - if err != nil { - glog.V(0).Infoln("Post returned error: ", err.Error()) - if _, ok := err.(*url.Error); ok { - // If we receive a network error try the next member - continue - } - } else { - return nil - } - } - - return errors.New("Could not connect to any cluster peers") -} - -// a workaround because http POST following redirection misses request body -func postFollowingOneRedirect(target string, contentType string, b bytes.Buffer) error { - backupReader := bytes.NewReader(b.Bytes()) - resp, err := http.Post(target, contentType, &b) - if err != nil { - return err - } - defer resp.Body.Close() - statusCode := resp.StatusCode - data, _ := ioutil.ReadAll(resp.Body) - reply := string(data) - - if strings.HasPrefix(reply, "\"http") { - urlStr := reply[1 : len(reply)-1] - - glog.V(0).Infoln("Post redirected to ", urlStr) - resp2, err2 := http.Post(urlStr, contentType, backupReader) - if err2 != nil { - return err2 - } - defer resp2.Body.Close() - data, _ = ioutil.ReadAll(resp2.Body) - statusCode = resp2.StatusCode - } - - glog.V(0).Infoln("Post returned status: ", statusCode, string(data)) - if statusCode != http.StatusOK { - return errors.New(string(data)) - } - - return nil -} |
